diff options
Diffstat (limited to 'protocols/ace/RMCast')
24 files changed, 852 insertions, 178 deletions
diff --git a/protocols/ace/RMCast/Makefile b/protocols/ace/RMCast/Makefile index ae13792c4c4..e97cd885493 100644 --- a/protocols/ace/RMCast/Makefile +++ b/protocols/ace/RMCast/Makefile @@ -636,17 +636,11 @@ include $(ACE_ROOT)/include/makeinclude/rules.local.GNU $(ACE_ROOT)/ace/Basic_Types.i \ $(ACE_ROOT)/ace/Trace.h \ $(ACE_ROOT)/ace/OS.i \ - RMCast_Export.h RMCast.i RMCast_Module.i \ - $(ACE_ROOT)/ace/RB_Tree.h \ - $(ACE_ROOT)/ace/Functor.h \ + RMCast_Export.h RMCast.i RMCast_Module.i RMCast_Copy_On_Write.h \ + RMCast_Worker.h RMCast_Worker.i RMCast_Worker.cpp \ + $(ACE_ROOT)/ace/Synch.h \ $(ACE_ROOT)/ace/ACE.h \ $(ACE_ROOT)/ace/ACE.i \ - $(ACE_ROOT)/ace/Functor.i \ - $(ACE_ROOT)/ace/Functor_T.h \ - $(ACE_ROOT)/ace/Functor_T.i \ - $(ACE_ROOT)/ace/Functor_T.cpp \ - $(ACE_ROOT)/ace/RB_Tree.i \ - $(ACE_ROOT)/ace/Synch.h \ $(ACE_ROOT)/ace/Synch.i \ $(ACE_ROOT)/ace/Synch_T.h \ $(ACE_ROOT)/ace/Event_Handler.h \ @@ -664,6 +658,14 @@ include $(ACE_ROOT)/include/makeinclude/rules.local.GNU $(ACE_ROOT)/ace/Log_Record.h \ $(ACE_ROOT)/ace/Log_Priority.h \ $(ACE_ROOT)/ace/Log_Record.i \ + RMCast_Copy_On_Write.i RMCast_Copy_On_Write.cpp \ + $(ACE_ROOT)/ace/RB_Tree.h \ + $(ACE_ROOT)/ace/Functor.h \ + $(ACE_ROOT)/ace/Functor.i \ + $(ACE_ROOT)/ace/Functor_T.h \ + $(ACE_ROOT)/ace/Functor_T.i \ + $(ACE_ROOT)/ace/Functor_T.cpp \ + $(ACE_ROOT)/ace/RB_Tree.i \ $(ACE_ROOT)/ace/Malloc.h \ $(ACE_ROOT)/ace/Malloc_Base.h \ $(ACE_ROOT)/ace/Based_Pointer_T.h \ diff --git a/protocols/ace/RMCast/RMCast.h b/protocols/ace/RMCast/RMCast.h index abf2a24e946..df3a0d48858 100644 --- a/protocols/ace/RMCast/RMCast.h +++ b/protocols/ace/RMCast/RMCast.h @@ -233,16 +233,19 @@ public: * * This message is used to provide feedback information to senders. * It contains two sequence numbers: - * - highest_in_sequence: is the sequence number of the last message - * received without any lost messages before it - * - highest_received: is the sequence number of the last_message - * successfully received, there may be some messages lost before it + * - \param next_expected: is the sequence number of the next message + * expected, i.e. (next_expected-1) is the last message received + * without any losses before it. + * - \param highest_received: is the highest sequence number among + * all the messages successfully received. + * In other words, all messages lost (if any) are in the range: + * [next_expected,highest_received) * * <CODE> * +---------+----------------------+<BR> * | 8 bits | MT_ACK |<BR> * +---------+----------------------+<BR> - * | 32 bits | highest_in_sequence |<BR> + * | 32 bits | next_expected |<BR> * +---------+----------------------+<BR> * | 32 bits | highest_received |<BR> * +---------+----------------------+<BR> @@ -251,7 +254,7 @@ public: struct Ack { //! The last message received without any losses before it. - ACE_UINT32 highest_in_sequence; + ACE_UINT32 next_expected; //! The last message successfully received ACE_UINT32 highest_received; diff --git a/protocols/ace/RMCast/RMCast_Copy_On_Write.cpp b/protocols/ace/RMCast/RMCast_Copy_On_Write.cpp new file mode 100644 index 00000000000..f1553c7f4ab --- /dev/null +++ b/protocols/ace/RMCast/RMCast_Copy_On_Write.cpp @@ -0,0 +1,176 @@ +// $Id$ + +#ifndef ACE_RMCAST_COPY_ON_WRITE_CPP +#define ACE_RMCAST_COPY_ON_WRITE_CPP + +#include "RMCast_Copy_On_Write.h" + +#if ! defined (__ACE_INLINE__) +#include "RMCast_Copy_On_Write.i" +#endif /* __ACE_INLINE__ */ + +ACE_RCSID(RMCast, RMCast_Copy_On_Write, "$Id$") + +template<class COLLECTION, class ITERATOR> void +ACE_RMCast_Copy_On_Write_Collection<COLLECTION,ITERATOR>::_incr_refcnt (void) +{ + // LOCKING: no locking is required, the caller grabs the mutex. + this->refcount_++; +} + +template<class COLLECTION, class ITERATOR> void +ACE_RMCast_Copy_On_Write_Collection<COLLECTION,ITERATOR>::_decr_refcnt (void) +{ + // LOCKING: no locking is required, the caller grabs the mutex. + { + this->refcount_--; + if (this->refcount_ != 0) + return; + } + //@@ TODO: If this wrapper is going to be completely general some + // kind of functor has to be provided to remove the elements in the + // collection, in case the are no self-managed + + delete this; +} + +// **************************************************************** + +template<class KEY, class ITEM, class COLLECTION, class ITERATOR> +ACE_RMCast_Copy_On_Write<KEY,ITEM,COLLECTION,ITERATOR>:: + ACE_RMCast_Copy_On_Write (void) + : pending_writes_ (0) + , writing_ (0) + , cond_ (mutex_) +{ + ACE_NEW (this->collection_, Collection); +} + +template<class KEY, class ITEM, class COLLECTION, class ITERATOR> +ACE_RMCast_Copy_On_Write<KEY,ITEM,COLLECTION,ITERATOR>:: + ~ACE_RMCast_Copy_On_Write (void) +{ + ACE_GUARD (ACE_SYNCH_MUTEX, ace_mon, this->mutex_); + + while (this->pending_writes_ != 0) + this->cond_.wait (); + + this->collection_->_decr_refcnt (); + this->collection_ = 0; +} + +template<class KEY, class ITEM, class COLLECTION, class ITERATOR> int +ACE_RMCast_Copy_On_Write<KEY,ITEM,COLLECTION,ITERATOR>:: + for_each (ACE_RMCast_Worker<KEY,ITEM> *worker) +{ + Read_Guard ace_mon (this->mutex_, this->collection_); + + ITERATOR end = ace_mon.collection->collection.end (); + for (ITERATOR i = ace_mon.collection->collection.begin (); i != end; ++i) + { + int r = worker->work ((*i).key (), (*i).item ()); + if (r != 0) + return r; + } + return 0; +} + +template<class KEY, class ITEM, class C, class I> int +ACE_RMCast_Copy_On_Write<KEY,ITEM,C,I>::bind (KEY const & k, + ITEM const & i) +{ + Write_Guard ace_mon (this->mutex_, + this->cond_, + this->pending_writes_, + this->writing_, + this->collection_); + + return this->bind_i (ace_mon, k, i); +} + +template<class KEY, class ITEM, class C, class I> int +ACE_RMCast_Copy_On_Write<KEY,ITEM,C,I>::unbind (KEY const & k) +{ + Write_Guard ace_mon (this->mutex_, + this->cond_, + this->pending_writes_, + this->writing_, + this->collection_); + + return this->unbind_i (ace_mon, k); +} + +template<class KEY, class ITEM, class C, class I> int +ACE_RMCast_Copy_On_Write<KEY,ITEM,C,I>::bind_i (Write_Guard &ace_mon, + KEY const & k, + ITEM const & i) +{ + return ace_mon.copy->collection.bind (k, i); +} + +template<class KEY, class ITEM, class C, class I> int +ACE_RMCast_Copy_On_Write<KEY,ITEM,C,I>::unbind_i (Write_Guard &ace_mon, + KEY const & k) +{ + return ace_mon.copy->collection.unbind (k); +} + +// **************************************************************** + +template<class COLLECTION, class ITERATOR> +ACE_RMCast_Copy_On_Write_Write_Guard<COLLECTION,ITERATOR>:: + ACE_RMCast_Copy_On_Write_Write_Guard (ACE_SYNCH_MUTEX &m, + ACE_SYNCH_CONDITION &c, + int &p, + int &w, + Collection*& cr) + : copy (0) + , mutex (m) + , cond (c) + , pending_writes (p) + , writing_flag (w) + , collection (cr) +{ + { + ACE_GUARD (ACE_SYNCH_MUTEX, ace_mon, this->mutex); + + this->pending_writes++; + + while (this->writing_flag != 0) + this->cond.wait (); + + this->writing_flag = 1; + } + + // Copy outside the mutex, because it may take a long time. + // Nobody can change it, because it is protected by the + // writing_flag. + + // First initialize it (with the correct reference count + ACE_NEW (this->copy, Collection); + // Copy the contents + this->copy->collection = this->collection->collection; +} + +template<class COLLECTION, class ITERATOR> +ACE_RMCast_Copy_On_Write_Write_Guard<COLLECTION,ITERATOR>:: + ~ACE_RMCast_Copy_On_Write_Write_Guard (void) +{ + Collection *tmp = 0; + { + ACE_GUARD (ACE_SYNCH_MUTEX, ace_mon, this->mutex); + + tmp = this->collection; + this->collection = this->copy; + this->writing_flag = 0; + this->pending_writes--; + + this->cond.signal (); + } + // Delete outside the mutex, because it may take a long time. + tmp->_decr_refcnt (); +} + +// **************************************************************** + +#endif /* ACE_RMCAST_COPY_ON_WRITE_CPP */ diff --git a/protocols/ace/RMCast/RMCast_Copy_On_Write.h b/protocols/ace/RMCast/RMCast_Copy_On_Write.h new file mode 100644 index 00000000000..8724e23a5d5 --- /dev/null +++ b/protocols/ace/RMCast/RMCast_Copy_On_Write.h @@ -0,0 +1,173 @@ +/* -*- C++ -*- */ +// $Id$ +// + +#ifndef ACE_RMCAST_COPY_ON_WRITE_H +#define ACE_RMCAST_COPY_ON_WRITE_H +#include "ace/pre.h" + +#include "RMCast_Worker.h" +#include "ace/Synch.h" + +//! A wrapper to implement reference counted collections +template<class COLLECTION, class ITERATOR> +class ACE_RMCast_Copy_On_Write_Collection +{ +public: + //! Constructor + ACE_RMCast_Copy_On_Write_Collection (void); + + //! Increment the reference count + void _incr_refcnt (void); + + //! Decrement the reference count + void _decr_refcnt (void); + + //! The actual collection + COLLECTION collection; + +private: + //! The reference count + ACE_UINT32 refcount_; +}; + +// **************************************************************** + +//! Implement a read guard for a reference counted collection +template<class COLLECTION, class ITERATOR> +class ACE_RMCast_Copy_On_Write_Read_Guard +{ +public: + typedef ACE_RMCast_Copy_On_Write_Collection<COLLECTION,ITERATOR> Collection; + + //! Constructor + ACE_RMCast_Copy_On_Write_Read_Guard (ACE_SYNCH_MUTEX &mutex, + Collection *&collection); + + //! Destructor + ~ACE_RMCast_Copy_On_Write_Read_Guard (void); + + //! A reference to the collection + Collection *collection; + +private: + //! Synchronization + ACE_SYNCH_MUTEX &mutex_; +}; + +// **************************************************************** + +//! Implement the write guard for a reference counted collecion +/*! + * This helper class atomically increments the reference count of a + * ACE_RMCast_Copy_On_Write_Collection and reads the current + * collection in the Copy_On_Write class. + */ +template<class COLLECTION, class ITERATOR> +class ACE_RMCast_Copy_On_Write_Write_Guard +{ +public: + typedef ACE_RMCast_Copy_On_Write_Collection<COLLECTION,ITERATOR> Collection; + + //! Constructor + ACE_RMCast_Copy_On_Write_Write_Guard (ACE_SYNCH_MUTEX &mutex, + ACE_SYNCH_CONDITION &cond, + int &pending_writes, + int &writing_flag, + Collection*& collection); + + //! Destructor + ~ACE_RMCast_Copy_On_Write_Write_Guard (void); + + //! The collection + Collection *copy; + +private: + //! Keep a reference to the mutex + ACE_SYNCH_MUTEX &mutex; + + //! Keep a reference to the condition variable + ACE_SYNCH_CONDITION &cond; + + //! Use a reference to update the pending writes count + int &pending_writes; + + //! Use a reference to update the writing flag + int &writing_flag; + + //! Use this reference to update the collection once the + //! modifications are finished. + Collection *&collection; +}; + +// **************************************************************** + +//! Implement a copy on write wrapper for a map-like collection +template<class KEY, class ITEM, class COLLECTION, class ITERATOR> +class ACE_RMCast_Copy_On_Write +{ +public: + //! The Read_Guard trait + typedef ACE_RMCast_Copy_On_Write_Read_Guard<COLLECTION,ITERATOR> Read_Guard; + + //! The Write_Guard trait + typedef ACE_RMCast_Copy_On_Write_Write_Guard<COLLECTION,ITERATOR> Write_Guard; + + //! The underlying collection type + typedef ACE_RMCast_Copy_On_Write_Collection<COLLECTION,ITERATOR> Collection; + + //! Constructor + ACE_RMCast_Copy_On_Write (void); + + //! Destructor + ~ACE_RMCast_Copy_On_Write (void); + + //! Iterate over all the elements invoking \param worker on each one. + int for_each (ACE_RMCast_Worker<KEY,ITEM> *worker); + + //! Add a new element + int bind (KEY const & key, ITEM const & item); + + //! Remove an element + int unbind (KEY const & key); + + //! Bind assuming the Write_Guard is held + int bind_i (Write_Guard &guard, KEY const & key, ITEM const & item); + + //! Unbind assuming the Write_Guard is held + int unbind_i (Write_Guard &guard, KEY const & key); + + //! Number of pending writes + int pending_writes_; + + //! If non-zero then a thread is changing the collection. + /*! + * Many threads can use the collection simulatenously, but only one + * change it. + */ + int writing_; + + //! A mutex to serialize access to the collection pointer. + ACE_SYNCH_MUTEX mutex_; + + //! A condition variable to wait to synchronize multiple writers. + ACE_SYNCH_CONDITION cond_; + + //! The collection, with reference counting added + Collection *collection_; +}; + +#if defined (__ACE_INLINE__) +#include "RMCast_Copy_On_Write.i" +#endif /* __ACE_INLINE__ */ + +#if defined (ACE_TEMPLATES_REQUIRE_SOURCE) +#include "RMCast_Copy_On_Write.cpp" +#endif /* ACE_TEMPLATES_REQUIRE_SOURCE */ + +#if defined (ACE_TEMPLATES_REQUIRE_PRAGMA) +#pragma implementation ("RMCast_Copy_On_Write.cpp") +#endif /* ACE_TEMPLATES_REQUIRE_PRAGMA */ + +#include "ace/post.h" +#endif /* ACE_RMCAST_COPY_ON_WRITE_H */ diff --git a/protocols/ace/RMCast/RMCast_Copy_On_Write.i b/protocols/ace/RMCast/RMCast_Copy_On_Write.i new file mode 100644 index 00000000000..c6e5099cda5 --- /dev/null +++ b/protocols/ace/RMCast/RMCast_Copy_On_Write.i @@ -0,0 +1,36 @@ +// $Id$ + +template<class COLLECTION, class ITERATOR> ACE_INLINE +ACE_RMCast_Copy_On_Write_Collection<COLLECTION,ITERATOR>:: + ACE_RMCast_Copy_On_Write_Collection (void) + : refcount_ (1) +{ +} + +// **************************************************************** + +template<class COLLECTION, class ITERATOR> ACE_INLINE +ACE_RMCast_Copy_On_Write_Read_Guard<COLLECTION,ITERATOR>:: + ACE_RMCast_Copy_On_Write_Read_Guard (ACE_SYNCH_MUTEX &m, + Collection*& collection_ref) + : collection (0) + , mutex_ (m) +{ + ACE_GUARD (ACE_SYNCH_MUTEX, ace_mon, this->mutex_); + this->collection = collection_ref; + this->collection->_incr_refcnt (); +} + +template<class COLLECTION, class ITERATOR> ACE_INLINE +ACE_RMCast_Copy_On_Write_Read_Guard<COLLECTION,ITERATOR>:: + ~ACE_RMCast_Copy_On_Write_Read_Guard (void) +{ + if (this->collection != 0) + { + ACE_GUARD (ACE_SYNCH_MUTEX, ace_mon, this->mutex_); + this->collection->_decr_refcnt (); + } +} + +// **************************************************************** + diff --git a/protocols/ace/RMCast/RMCast_Fragment.h b/protocols/ace/RMCast/RMCast_Fragment.h index 7b64d763ebc..eed08c92517 100644 --- a/protocols/ace/RMCast/RMCast_Fragment.h +++ b/protocols/ace/RMCast/RMCast_Fragment.h @@ -1,15 +1,5 @@ // $Id$ -// ============================================================================ -// -// = DESCRIPTION -// The fragmentation task for the reliable multicast library -// -// = AUTHOR -// Carlos O'Ryan <coryan@uci.edu> -// -// ============================================================================ - #ifndef ACE_RMCAST_FRAGMENT_H #define ACE_RMCAST_FRAGMENT_H #include "ace/pre.h" @@ -21,28 +11,46 @@ # pragma once #endif /* ACE_LACKS_PRAGMA_ONCE */ +//! Default fragment size #ifndef ACE_RMCAST_DEFAULT_FRAGMENT_SIZE # define ACE_RMCAST_DEFAULT_FRAGMENT_SIZE 1024 #endif /* ACE_RMCAST_DEFAULT_FRAGMENT_SIZE */ +//! Fragmentation module +/*! + * Some transports cannot send very big messages, for example UDP + * imposes a limit of 64K, and in practice the limit is even more + * strict than that. + * This class decomposes a message into multiple fragments, using an + * application defined maximum size. + */ class ACE_RMCast_Export ACE_RMCast_Fragment : public ACE_RMCast_Module { public: + //! Constructor ACE_RMCast_Fragment (void); - // Constructor + //! Destructor virtual ~ACE_RMCast_Fragment (void); - // Destructor + //! Accessor for the max_fragment size. + /*! There is no modifier, the maximum fragment size is obtained + * using feedback from the lower layers (transport?) + * @@TODO We have not implemented the feedback mechanisms yet! + */ size_t max_fragment_size (void) const; - // Accessor for the max_fragment size. - // There is no modifier, the maximum fragment size is obtained using - // feedback from the lower layer (transport?) - // = The ACE_RMCast_Module methods + /*! + * Only data messages need fragmentation, the control messages are + * all small enough for all the transports that I know about. + * Well, actually for CAN-Bus (Controller Area Network), they may be + * too big, because the max payload there is 8 bytes, but we don't + * play with those in ACE. + */ virtual int data (ACE_RMCast::Data &data); private: + //! Current fragment size limit size_t max_fragment_size_; }; diff --git a/protocols/ace/RMCast/RMCast_IO_UDP.cpp b/protocols/ace/RMCast/RMCast_IO_UDP.cpp index af655f3130f..421982d5ad6 100644 --- a/protocols/ace/RMCast/RMCast_IO_UDP.cpp +++ b/protocols/ace/RMCast/RMCast_IO_UDP.cpp @@ -354,7 +354,7 @@ ACE_RMCast_IO_UDP::send_ack (ACE_RMCast::Ack &ack, char header[16]; header[0] = ACE_RMCast::MT_ACK; - ACE_UINT32 tmp = ACE_HTONL (ack.highest_in_sequence); + ACE_UINT32 tmp = ACE_HTONL (ack.next_expected); ACE_OS::memcpy (header + 1, &tmp, sizeof(ACE_UINT32)); tmp = ACE_HTONL (ack.highest_received); diff --git a/protocols/ace/RMCast/RMCast_IO_UDP.h b/protocols/ace/RMCast/RMCast_IO_UDP.h index bdcccabe6e1..5af403bf994 100644 --- a/protocols/ace/RMCast/RMCast_IO_UDP.h +++ b/protocols/ace/RMCast/RMCast_IO_UDP.h @@ -33,44 +33,65 @@ class ACE_Time_Value; class ACE_RMCast_Export ACE_RMCast_IO_UDP : public ACE_RMCast_Module { public: + //! Constructor + /*! + * The <factory> argument is used to create the modules for each + * proxy that process incoming messages. The class does *not* assume + * ownership of <factory>, the caller owns it. But it does assume + * ownership of the modules returned by the factory, and it may ask + * the factory to release them eventually. + */ ACE_RMCast_IO_UDP (ACE_RMCast_Module_Factory *factory); - // Constructor - // <factory> is used to create the modules for each proxy that - // process incoming messages. The class does *not* assume ownership - // of <factory>, the caller owns it. + //! Destructor ~ACE_RMCast_IO_UDP (void); - // Destructor + //! Join a new multicast group + /*! + * Start receiving data for the <mcast_addr> multicast group. + * Please read the documentation of ACE_SOCK_Dgram_Mcast for more + * details. + */ int subscribe (const ACE_INET_Addr &mcast_addr, int reuse_addr = 1, const ACE_TCHAR *net_if = 0, int protocol_family = PF_INET, int protocol = 0); - // Start receiving data for the <mcast_addr> multicast group. - // Please read the documentation of <ACE_SOCK_Dgram_Mcast> for more - // details. // The class can be used with a Reactor or using blocking I/O // depending on what method of the following two is called. + //! Wait for events for the period <tv>. If <tv> is zero it blocks + //! forever. int handle_events (ACE_Time_Value *tv = 0); - // Wait for events for the period <tv>. If <tv> is zero it blocks - // forever. + //! Register any event handlers into <reactor> + /*! + * @@TODO: This should be left for the clients of the class, there + * is no reason why this class must know about reactors. + */ int register_handlers (ACE_Reactor *reactor); - // Register any event handlers into <reactor> + //! Remove all the handlers from the reactor + /*! + * @@TODO: This should be left for the clients of the class, there + * is no reason why this class must know about reactors. + */ int remove_handlers (void); - // Remove all the handlers from the reactor + //! There is data to read, read it and process it. int handle_input (ACE_HANDLE h); - // There is data to read, read it and process it. + //! Obtain the handle for the underlying socket ACE_HANDLE get_handle (void) const; - // Obtain the handle for the underlying socket - // Send back to the remove object represented by <proxy> + //@{ + //! Send the message to the ACE_INET_Addr argument. + /*! + * These methods are used in the implementation of the + * ACE_RMCast_UDP_Proxy objects and the implementation of the + * inherited ACE_RMCast_Module methods in this class. + */ int send_data (ACE_RMCast::Data &, const ACE_INET_Addr &); int send_poll (ACE_RMCast::Poll &, const ACE_INET_Addr &); int send_ack_join (ACE_RMCast::Ack_Join &, const ACE_INET_Addr &); @@ -78,8 +99,9 @@ public: int send_ack (ACE_RMCast::Ack &, const ACE_INET_Addr &); int send_join (ACE_RMCast::Join &, const ACE_INET_Addr &); int send_leave (ACE_RMCast::Leave &, const ACE_INET_Addr &); + //@} - // = The RMCast_Module methods + // Please read the documentation in ACE_RMCast_Module for more details virtual int data (ACE_RMCast::Data &); virtual int poll (ACE_RMCast::Poll &); virtual int ack_join (ACE_RMCast::Ack_Join &); @@ -87,23 +109,24 @@ public: virtual int ack (ACE_RMCast::Ack &); virtual int join (ACE_RMCast::Join &); virtual int leave (ACE_RMCast::Leave &); - // The messages are sent to the multicast group private: + //! The factory used to create the modules attached to each proxy ACE_RMCast_Module_Factory *factory_; - // The factory used to create the modules attached to each proxy + //! The multicast group we subscribe and send to ACE_INET_Addr mcast_group_; - // The multicast group we subscribe and send to + //! The socket used to receive and send data ACE_SOCK_Dgram_Mcast dgram_; - // The socket + //! Use a Hash_Map to maintain the collection of proxies typedef ACE_Hash_Map_Manager<ACE_INET_Addr,ACE_RMCast_UDP_Proxy*,ACE_Null_Mutex> Map; + //! The collection of proxies Map map_; + //! The event handler adapter ACE_RMCast_UDP_Event_Handler eh_; - // The event handler adapter }; #if defined (__ACE_INLINE__) diff --git a/protocols/ace/RMCast/RMCast_Membership.cpp b/protocols/ace/RMCast/RMCast_Membership.cpp index 6ee2690a41f..a23d7a756e5 100644 --- a/protocols/ace/RMCast/RMCast_Membership.cpp +++ b/protocols/ace/RMCast/RMCast_Membership.cpp @@ -28,14 +28,14 @@ ACE_RMCast_Membership::ack (ACE_RMCast::Ack &ack) ACE_RMCast::Ack next_ack; { ACE_GUARD_RETURN (ACE_SYNCH_MUTEX, ace_mon, this->mutex_, -1); - if (ack.highest_in_sequence < this->highest_in_sequence_) + if (ack.next_expected < this->next_expected_) { // @@ This violates an invariant of the class, shouldn't // happen... // ACE_DEBUG ((LM_DEBUG, "ACE_RMCast_Membership::ack[3]\n")); return -1; } - else if (ack.highest_in_sequence == this->highest_in_sequence_) + else if (ack.next_expected == this->next_expected_) { // Nothing new, just continue.... // ACE_DEBUG ((LM_DEBUG, "ACE_RMCast_Membership::ack[4]\n")); @@ -43,21 +43,23 @@ ACE_RMCast_Membership::ack (ACE_RMCast::Ack &ack) } // Possible update, re-evaluate the story... - ACE_UINT32 highest_in_sequence = (*i)->highest_in_sequence (); + ACE_UINT32 next_expected = (*i)->next_expected (); ACE_UINT32 highest_received = (*i)->highest_received (); ++i; for (; i != end; ++i) { - ACE_UINT32 s = (*i)->highest_in_sequence (); - if (s < highest_in_sequence) - highest_in_sequence = s; + ACE_UINT32 s = (*i)->next_expected (); + if (s < next_expected) + next_expected = s; ACE_UINT32 r = (*i)->highest_received (); if (r > highest_received) highest_received = r; } #if 0 - if (this->highest_in_sequence_ >= highest_in_sequence + // @@TODO: this is an important feature, disabled until it is + // fully debugged + if (this->next_expected_ >= next_expected || this->highest_received_ >= highest_received) { // No change.... @@ -65,12 +67,12 @@ ACE_RMCast_Membership::ack (ACE_RMCast::Ack &ack) return 0; } #endif /* 0 */ - this->highest_in_sequence_ = highest_in_sequence; + this->next_expected_ = next_expected; this->highest_received_ = highest_received; if (this->next () == 0) return 0; next_ack.source = ack.source; - next_ack.highest_in_sequence = this->highest_in_sequence_; + next_ack.next_expected = this->next_expected_; next_ack.highest_received = this->highest_received_; } // @@ This looks like a race condition, next() is checked inside the @@ -89,6 +91,8 @@ ACE_RMCast_Membership::join (ACE_RMCast::Join &join) ACE_GUARD_RETURN (ACE_SYNCH_MUTEX, ace_mon, this->mutex_, -1); if (this->proxies_.insert (join.source) == -1) return -1; + // @@TODO: This may change the next Ack to send up, should + // recompute and send the right message if that was the case. } return this->ACE_RMCast_Module::join (join); @@ -103,6 +107,8 @@ ACE_RMCast_Membership::leave (ACE_RMCast::Leave &leave) { ACE_GUARD_RETURN (ACE_SYNCH_MUTEX, ace_mon, this->mutex_, -1); (void) this->proxies_.remove (leave.source); + // @@TODO: This may change the next Ack to send up, should + // recompute and send the right message if that was the case. } return this->ACE_RMCast_Module::leave (leave); diff --git a/protocols/ace/RMCast/RMCast_Membership.h b/protocols/ace/RMCast/RMCast_Membership.h index a99a7752507..21ee1bea97b 100644 --- a/protocols/ace/RMCast/RMCast_Membership.h +++ b/protocols/ace/RMCast/RMCast_Membership.h @@ -28,41 +28,59 @@ class ACE_RMCast_Proxy; +//! Track peer membership +/*! + * Reliable senders of events need to know exactly how many peers are + * receiving the events, and how many events has each peer received so + * far. + * This class uses the Join, Leave and Ack messages to build that + * information, it also summarizes the Ack events and propagate only + * the global info to the upper layer. + */ class ACE_RMCast_Export ACE_RMCast_Membership : public ACE_RMCast_Module { - // = TITLE - // Track Receiver membership - // - // = DESCRIPTION - // Define the interface for all reliable multicast membership public: - // = Initialization and termination methods. + //! Constructor ACE_RMCast_Membership (void); - // Constructor + //! Destructor virtual ~ACE_RMCast_Membership (void); - // Destructor - // = The RMCast_Module methods + //! Receive an process an Ack message + /*! + * After receiving the Ack message we find out what is the lowest + * sequence number received in order among all the acks received by + * the proxies in the collection. We also find out what is the + * highest sequence number received by any proxy. + * We only propagate that information back to the upper layer, and + * then only if there are any news since the last Ack. + */ virtual int ack (ACE_RMCast::Ack &); + + //! Add a new member to the collection, using the <source> field in + //! the Join message virtual int join (ACE_RMCast::Join &); + + //! Remove a member from the collection, using the <source> field in + //! the Join message virtual int leave (ACE_RMCast::Leave &); protected: + //! Use an unbounded set to maintain the collection of proxies. typedef ACE_Unbounded_Set<ACE_RMCast_Proxy*> Proxy_Collection; typedef ACE_Unbounded_Set_Iterator<ACE_RMCast_Proxy*> Proxy_Iterator; + //! The collection of proxies Proxy_Collection proxies_; - // The membership buffer - ACE_UINT32 highest_in_sequence_; - // The smallest value of <highest_in_sequence> for all the proxies + //! The smallest value of \param next_expected for all the proxies + ACE_UINT32 next_expected_; + //! The highest value of \param highest_received for all the proxies ACE_UINT32 highest_received_; - // The highest value of <highest_received> for all the proxies + //! Synchronization ACE_SYNCH_MUTEX mutex_; - // Synchronization }; #if defined (__ACE_INLINE__) diff --git a/protocols/ace/RMCast/RMCast_Membership.i b/protocols/ace/RMCast/RMCast_Membership.i index 0c3e33c2d01..b513c2d5141 100644 --- a/protocols/ace/RMCast/RMCast_Membership.i +++ b/protocols/ace/RMCast/RMCast_Membership.i @@ -2,7 +2,7 @@ ACE_INLINE ACE_RMCast_Membership::ACE_RMCast_Membership (void) - : highest_in_sequence_ (0) + : next_expected_ (0) , highest_received_ (0) { } diff --git a/protocols/ace/RMCast/RMCast_Module.h b/protocols/ace/RMCast/RMCast_Module.h index dc4077fa4ab..fad76caac53 100644 --- a/protocols/ace/RMCast/RMCast_Module.h +++ b/protocols/ace/RMCast/RMCast_Module.h @@ -36,55 +36,55 @@ class ACE_Time_Value; class ACE_RMCast_Export ACE_RMCast_Module { public: - // = Initialization and termination methods. + //! Constructor ACE_RMCast_Module (void); - //!< Constructor + //! Destructor virtual ~ACE_RMCast_Module (void); - //!< Destructor + //! Modifier for the next element in the stack virtual int next (ACE_RMCast_Module *next); - //!< Modifier for the next element in the stack + //! Accesor for the next element in the stack virtual ACE_RMCast_Module* next (void) const; - //!< Accesor for the next element in the stack + //! Modifier for the previous element in the stack virtual int prev (ACE_RMCast_Module *prev); - //!< Modifier for the previous element in the stack + //! Accesor for the previous element in the stack virtual ACE_RMCast_Module* prev (void) const; - //!< Accesor for the previous element in the stack + //! Initialize the module, setting up the next module virtual int open (void); - //!< Initialize the module, setting up the next module + //! Close the module. virtual int close (void); - //!< Close the module. + //! Push data through the stack virtual int data (ACE_RMCast::Data &); - //!< Push data through the stack + //! Push a polling request through the stack virtual int poll (ACE_RMCast::Poll &); - //!< Push a polling request through the stack + //! Push a message to ack a join request through the stack virtual int ack_join (ACE_RMCast::Ack_Join &); - //!< Push a message to ack a join request through the stack + //! Push a message to ack a leave request through the stack virtual int ack_leave (ACE_RMCast::Ack_Leave &); - //!< Push a message to ack a leave request through the stack + //! Push an ack mesage through the stack virtual int ack (ACE_RMCast::Ack &); - //!< Push an ack mesage through the stack + //! Push a join message through the stack virtual int join (ACE_RMCast::Join &); - //!< Push a join message through the stack + //! Push a leave message through the stack virtual int leave (ACE_RMCast::Leave &); - //!< Push a leave message through the stack private: //! The next element in the stack ACE_RMCast_Module *next_; + //! The previous element in the stack ACE_RMCast_Module *prev_; }; diff --git a/protocols/ace/RMCast/RMCast_Module_Factory.h b/protocols/ace/RMCast/RMCast_Module_Factory.h index 722ad87d678..f0ea58df0e5 100644 --- a/protocols/ace/RMCast/RMCast_Module_Factory.h +++ b/protocols/ace/RMCast/RMCast_Module_Factory.h @@ -27,19 +27,40 @@ class ACE_RMCast_Module; class ACE_RMCast_IO_UDP; +//! Create Module stacks +/*! + * Different application will probably require different + * configurations in their Module stack, some will just want best + * effort semantics. Others will use Reliable communication with a + * maximum retransmission time. Furthermore, applications may want to + * receive messages in send order, or just as soon as they are + * received. + * Obviously most applications will want to change want happens once a + * message is completely received. + * + * To achieve all this flexibility the IO layer uses this factory to + * create the full stack of Modules corresponding to a single + * consumer. + * To keep the complexity under control the intention is to create + * helper Factories, such as Reliable_Module_Factory where + * applications only need to customize a few features. + */ class ACE_RMCast_Export ACE_RMCast_Module_Factory { - // = DESCRIPTION - // public: + //! Destructor virtual ~ACE_RMCast_Module_Factory (void); - // Destructor + //! Create a new proxy virtual ACE_RMCast_Module *create (ACE_RMCast_IO_UDP *) = 0; - // Create a new proxy + //! Destroy a proxy + /*! + * Some factories may allocate modules from a pool, or return the + * same module for all proxies. Consequently, only the factory + * knows how to destroy them. + */ virtual void destroy (ACE_RMCast_Module *) = 0; - // Destroy a proxy }; #if defined (__ACE_INLINE__) diff --git a/protocols/ace/RMCast/RMCast_Partial_Message.h b/protocols/ace/RMCast/RMCast_Partial_Message.h index 9b71eb4a541..88fa9ab2f1a 100644 --- a/protocols/ace/RMCast/RMCast_Partial_Message.h +++ b/protocols/ace/RMCast/RMCast_Partial_Message.h @@ -26,44 +26,72 @@ #define ACE_RMCAST_DEFAULT_HOLE_COUNT 16 #endif /* ACE_RMCAST_DEFAULT_HOLE_COUNT */ +//! Represent a partially received message in the +//! ACE_RMCast_Reassembly module +/*! + * This class provides temporary storage for the fragments as they are + * received in the ACE_RMCast_Reassembly module. It also keeps track + * of what portions of the message are still missing. + */ class ACE_RMCast_Export ACE_RMCast_Partial_Message { public: + //! Constructor, reserve enough memory for the complete message ACE_RMCast_Partial_Message (ACE_UINT32 message_size); + + //! Destructor ~ACE_RMCast_Partial_Message (void); + //! Process a fragment + /*! + * A fragment starting at <offset> has been received, copy the + * fragment contents and update the list of holes. + */ int fragment_received (ACE_UINT32 message_size, ACE_UINT32 offset, ACE_Message_Block *mb); + + //! Return 1 if the message is complete int is_complete (void) const; + //! Return the body of the message, the memory is *not* owned by the + //! caller ACE_Message_Block *message_body (void); - // Return the body of the message, the memory is owned by the - // class. private: + //! Insert a new hole into the list + /*! + * The class keeps an array to represent the missing portions of the + * message. This method inserts a new hole, i.e. a new element in + * the array at index <i>. The <start> and <end> arguments represent + * the offsets of the missing portion of the message. + */ int insert_hole (size_t i, ACE_UINT32 start, ACE_UINT32 end); - // Insert a new hole into the list + //! Remove a hole from the list int remove_hole (size_t i); - // Remove a hole from the list private: + //! Maintain the message storage ACE_Message_Block message_body_; - // Used to rebuild the body of the message + //! Represent a missing portion of a message struct Hole { + //! Offset where the missing portion of the message starts ACE_UINT32 start; + //! Offset where the missing portion of the message ends ACE_UINT32 end; }; + //! Implement a growing array of Hole structures + //@{ Hole *hole_list_; size_t max_hole_count_; size_t hole_count_; - // The current list of holes in the message_body. + //@} }; #if defined (__ACE_INLINE__) diff --git a/protocols/ace/RMCast/RMCast_Proxy.cpp b/protocols/ace/RMCast/RMCast_Proxy.cpp index 53d9d0b6726..f6b2bbec5e5 100644 --- a/protocols/ace/RMCast/RMCast_Proxy.cpp +++ b/protocols/ace/RMCast/RMCast_Proxy.cpp @@ -15,9 +15,9 @@ ACE_RMCast_Proxy::~ACE_RMCast_Proxy (void) } ACE_UINT32 -ACE_RMCast_Proxy::highest_in_sequence (void) const +ACE_RMCast_Proxy::next_expected (void) const { - return this->highest_in_sequence_; + return this->next_expected_; } ACE_UINT32 @@ -29,7 +29,7 @@ ACE_RMCast_Proxy::highest_received (void) const int ACE_RMCast_Proxy::ack (ACE_RMCast::Ack &ack) { - this->highest_in_sequence_ = ack.highest_in_sequence; + this->next_expected_ = ack.next_expected; this->highest_received_ = ack.highest_received; return this->ACE_RMCast_Module::ack (ack); } diff --git a/protocols/ace/RMCast/RMCast_Proxy.h b/protocols/ace/RMCast/RMCast_Proxy.h index 414b74174fb..e0e6afe79b1 100644 --- a/protocols/ace/RMCast/RMCast_Proxy.h +++ b/protocols/ace/RMCast/RMCast_Proxy.h @@ -48,27 +48,28 @@ public: //! Destructor virtual ~ACE_RMCast_Proxy (void); - - //! Return the highest sequence number received without any losses - //! before it. Only applies to remote receiver proxies. + + //! Return the next sequence number expected by the peer. Only + //! applies to remote receiver proxies. /*! - Please read the documentation in ACE_RMCast::Ack + * Please read the documentation in ACE_RMCast::Ack */ - virtual ACE_UINT32 highest_in_sequence (void) const; + virtual ACE_UINT32 next_expected (void) const; //! Return the highest sequence number successfully received. //! Only applies to remote receiver proxies. /*! - Please read the documentation in ACE_RMCast::Ack + * Please read the documentation in ACE_RMCast::Ack */ virtual ACE_UINT32 highest_received (void) const; //@{ //! Send messages directly to the peer. - /*! Send a message directly to the peer, i.e. the message is not - sent through the multicast group and it may not be processed by - all the layers in the stack. - */ + /*! + * Send a message directly to the peer, i.e. the message is not + * sent through the multicast group and it may not be processed by + * all the layers in the stack. + */ virtual int reply_data (ACE_RMCast::Data &) = 0; virtual int reply_poll (ACE_RMCast::Poll &) = 0; virtual int reply_ack_join (ACE_RMCast::Ack_Join &) = 0; @@ -79,8 +80,8 @@ public: //@} /*! - Proxies process the ACK sequence numbers to save the sequence - numbers reported from the remote peer. + * Proxies process the ACK sequence numbers to cache the ack + * information from the peer. */ virtual int ack (ACE_RMCast::Ack &); @@ -88,7 +89,7 @@ private: //@{ //! Cache the sequence numbers reported from the remote peer using //! Ack messages - ACE_UINT32 highest_in_sequence_; + ACE_UINT32 next_expected_; ACE_UINT32 highest_received_; //@} }; diff --git a/protocols/ace/RMCast/RMCast_Proxy.i b/protocols/ace/RMCast/RMCast_Proxy.i index f93feaa5639..6fee09fe9e5 100644 --- a/protocols/ace/RMCast/RMCast_Proxy.i +++ b/protocols/ace/RMCast/RMCast_Proxy.i @@ -2,7 +2,7 @@ ACE_INLINE ACE_RMCast_Proxy::ACE_RMCast_Proxy (void) - : highest_in_sequence_ (0) + : next_expected_ (0) , highest_received_ (0) { } diff --git a/protocols/ace/RMCast/RMCast_Retransmission.cpp b/protocols/ace/RMCast/RMCast_Retransmission.cpp index a996e1204d5..7e38cdf7c97 100644 --- a/protocols/ace/RMCast/RMCast_Retransmission.cpp +++ b/protocols/ace/RMCast/RMCast_Retransmission.cpp @@ -16,18 +16,60 @@ ACE_RMCast_Retransmission::~ACE_RMCast_Retransmission (void) { } +class ACE_RMCast_Resend_Worker + : public ACE_RMCast_Worker<ACE_UINT32,ACE_RMCast::Data> +{ +public: + ACE_RMCast_Resend_Worker (ACE_RMCast_Module *next, + ACE_UINT32 max_sequence_number) + : n (0) + , next_ (next) + , max_sequence_number_ (max_sequence_number) + { + } + + int work (ACE_UINT32 const & key, + ACE_RMCast::Data const & item) + { + if (key > this->max_sequence_number_) + return 0; + ACE_DEBUG ((LM_DEBUG, + " Retransmission::resend - message %d resent\n", + key)); + ACE_RMCast::Data data = item; + int r = this->next_->data (data); + if (r != 0) + return r; + n++; + return 0; + } + + int n; + +private: + ACE_RMCast_Module *next_; + + ACE_UINT32 max_sequence_number_; +}; + int -ACE_RMCast_Retransmission::close (void) +ACE_RMCast_Retransmission::resend (ACE_UINT32 max_sequence_number) { - Messages_Iterator end = this->messages_.end (); + if (this->next () == 0) + return 0; - for (Messages_Iterator i = this->messages_.begin (); - i != end; - ++i) - { - ACE_Message_Block::release ((*i).item ().payload); - } - this->messages_.close (); + ACE_RMCast_Resend_Worker worker (this->next (), max_sequence_number); + + if (this->messages_.for_each (&worker) == -1) + return -1; + + return worker.n; +} + +int +ACE_RMCast_Retransmission::close (void) +{ + // @@ return 0; } @@ -40,7 +82,6 @@ ACE_RMCast_Retransmission::data (ACE_RMCast::Data &data) int r = this->next ()->data (data); if (r == 0) { - ACE_GUARD_RETURN (ACE_SYNCH_MUTEX, ace_mon, this->mutex_, -1); ACE_RMCast::Data copy = data; copy.payload = ACE_Message_Block::duplicate (data.payload); r = this->messages_.bind (data.sequence_number, copy); @@ -55,8 +96,9 @@ ACE_RMCast_Retransmission::join (ACE_RMCast::Join &join) return 0; ACE_RMCast::Ack_Join ack_join; +#if 0 + // @@ { - ACE_GUARD_RETURN (ACE_SYNCH_MUTEX, ace_mon, this->mutex_, -1); Messages_Iterator end = this->messages_.end (); Messages_Iterator begin = this->messages_.begin (); @@ -70,6 +112,7 @@ ACE_RMCast_Retransmission::join (ACE_RMCast::Join &join) ack_join.next_sequence_number = (*begin).key (); } } +#endif (void) join.source->reply_ack_join (ack_join); // @@ We should force a full retransmission of all the messages! @@ -77,20 +120,54 @@ ACE_RMCast_Retransmission::join (ACE_RMCast::Join &join) return 0; } +class ACE_RMCast_Ack_Worker + : public ACE_RMCast_Worker<ACE_UINT32,ACE_RMCast::Data> +{ +public: + ACE_RMCast_Ack_Worker (ACE_RMCast::Ack &ack, + ACE_RMCast_Retransmission::Messages::Write_Guard &g, + ACE_RMCast_Retransmission::Messages *messages) + : ack_ (ack) + , ace_mon_ (g) + , messages_ (messages) + { + } + + int work (ACE_UINT32 const & key, + ACE_RMCast::Data const &) + { + if (key >= this->ack_.next_expected) + return 0; + ACE_DEBUG ((LM_DEBUG, + " Retransmission::ack - message %d erased\n", + key)); + return this->messages_->unbind_i (this->ace_mon_, key); + } + +private: + ACE_RMCast_Ack_Worker (const ACE_RMCast_Ack_Worker&); + ACE_RMCast_Ack_Worker& operator= (const ACE_RMCast_Ack_Worker&); + +private: + ACE_RMCast::Ack &ack_; + + ACE_RMCast_Retransmission::Messages::Write_Guard &ace_mon_; + + ACE_RMCast_Retransmission::Messages *messages_; +}; + int ACE_RMCast_Retransmission::ack (ACE_RMCast::Ack &ack) { - ACE_GUARD_RETURN (ACE_SYNCH_MUTEX, ace_mon, this->mutex_, -1); - for (Messages_Iterator i = this->messages_.begin (); - i != this->messages_.end (); - /* do nothing */) - { - ACE_UINT32 key = (*i).key (); - if (key > ack.highest_in_sequence) - break; - this->messages_.unbind (key); - } - return 0; + Messages::Write_Guard ace_mon (this->messages_.mutex_, + this->messages_.cond_, + this->messages_.pending_writes_, + this->messages_.writing_, + this->messages_.collection_); + + ACE_RMCast_Ack_Worker worker (ack, ace_mon, &this->messages_); + + return this->messages_.for_each (&worker); } #if defined (ACE_HAS_EXPLICIT_TEMPLATE_INSTANTIATION) @@ -101,4 +178,10 @@ template class ACE_RB_Tree_Iterator<ACE_UINT32,ACE_RMCast::Data,ACE_Less_Than<AC template class ACE_RB_Tree_Reverse_Iterator<ACE_UINT32,ACE_RMCast::Data,ACE_Less_Than<ACE_UINT32>,ACE_Null_Mutex>; template class ACE_RB_Tree_Node<ACE_UINT32,ACE_RMCast::Data>; +template class ACE_RMCast_Copy_On_Write<ACE_UINT32,ACE_RMCast::Data,ACE_RMCast_Retransmission::Collection,ACE_RMCast_Retransmission::Collection_Iterator>; +template class ACE_RMCast_Copy_On_Write_Write_Guard<ACE_RMCast_Retransmission::Collection,ACE_RMCast_Retransmission::Collection_Iterator>; +template class ACE_RMCast_Copy_On_Write_Read_Guard<ACE_RMCast_Retransmission::Collection,ACE_RMCast_Retransmission::Collection_Iterator>; +template class ACE_RMCast_Copy_On_Write_Collection<ACE_RMCast_Retransmission::Collection,ACE_RMCast_Retransmission::Collection_Iterator>; +template class ACE_RMCast_Worker<ACE_UINT32,ACE_RMCast::Data>; + #endif /* ACE_HAS_EXPLICIT_TEMPLATE_INSTANTIATION */ diff --git a/protocols/ace/RMCast/RMCast_Retransmission.h b/protocols/ace/RMCast/RMCast_Retransmission.h index 7c586fe5dd6..b7bc20d2914 100644 --- a/protocols/ace/RMCast/RMCast_Retransmission.h +++ b/protocols/ace/RMCast/RMCast_Retransmission.h @@ -19,6 +19,7 @@ #include "ace/pre.h" #include "RMCast_Module.h" +#include "RMCast_Copy_On_Write.h" #include "ace/RB_Tree.h" #include "ace/Synch.h" @@ -26,38 +27,72 @@ # pragma once #endif /* ACE_LACKS_PRAGMA_ONCE */ +//! Store messages for retransmission in reliable configurations +/*! + * Reliable configurations of the RMCast framework need to store + * messages on the sender side to resend them if one or more clients + * do not receive them successfully. + */ class ACE_RMCast_Export ACE_RMCast_Retransmission : public ACE_RMCast_Module { - // = TITLE - // Reliable Multicast Retransmission - // - // = DESCRIPTION - // Define the interface for all reliable multicast retransmission public: // = Initialization and termination methods. + //! Constructor ACE_RMCast_Retransmission (void); - // Constructor + //! Destructor virtual ~ACE_RMCast_Retransmission (void); - // Destructor - // = The RMCast_Module methods + //! Use a Red-Black Tree to keep the queue of messages + typedef ACE_RB_Tree<ACE_UINT32,ACE_RMCast::Data,ACE_Less_Than<ACE_UINT32>,ACE_Null_Mutex> Collection; + typedef ACE_RB_Tree_Iterator<ACE_UINT32,ACE_RMCast::Data,ACE_Less_Than<ACE_UINT32>,ACE_Null_Mutex> Collection_Iterator; + + //! The messages are stored in the Copy_On_Write wrapper to provide + //! an efficient, but thread safe interface. + typedef ACE_RMCast_Copy_On_Write<ACE_UINT32,ACE_RMCast::Data,Collection,Collection_Iterator> Messages; + + //! Resend messages + /*! + * Resends all the messages up to \param max_sequence_number + * Returns the number of messages sent, or -1 if there where any + * errors. + */ + int resend (ACE_UINT32 max_sequence_number); + + //! Cleanup all the stored messages virtual int close (void); + + //! Pass the message downstream, but also save it in the + //! retransmission queue + /*! + * Sequence number are assigned by the ACE_RMCast_Fragmentation + * class, consequently this class first passes the message + * downstream, to obtain the sequence number and then stores the + * message for later retransmission. + */ virtual int data (ACE_RMCast::Data &data); + + //! Process an Ack message from the remote receivers. + /*! + * Normally this Ack message will be a summary of all the Ack + * messages received by the ACE_RMCast_Membership class + */ virtual int ack (ACE_RMCast::Ack &); + + //! Detect when new members join the group and Ack_Join them + /*! + * When a new receiver joins the group this module sends an Ack_Join + * message with the next sequence number that the receiver should + * expect. + * The sequence number is obtained from the current list of cached + * messages. + */ virtual int join (ACE_RMCast::Join &); protected: - typedef ACE_RB_Tree<ACE_UINT32,ACE_RMCast::Data,ACE_Less_Than<ACE_UINT32>,ACE_Null_Mutex> - Messages; - typedef ACE_RB_Tree_Iterator<ACE_UINT32,ACE_RMCast::Data,ACE_Less_Than<ACE_UINT32>,ACE_Null_Mutex> - Messages_Iterator; + //! The retransmission buffer Messages messages_; - // The retransmission buffer - - ACE_SYNCH_MUTEX mutex_; - // Synchronization }; #if defined (__ACE_INLINE__) diff --git a/protocols/ace/RMCast/RMCast_UDP_Event_Handler.h b/protocols/ace/RMCast/RMCast_UDP_Event_Handler.h index 02798cee7f8..2a6e7c45d42 100644 --- a/protocols/ace/RMCast/RMCast_UDP_Event_Handler.h +++ b/protocols/ace/RMCast/RMCast_UDP_Event_Handler.h @@ -1,16 +1,5 @@ // $Id$ -// ============================================================================ -// -// = DESCRIPTION -// Implement an adapter between the ACE Reactor and the -// ACE_RMCast_IO_UDP -// -// = AUTHOR -// Carlos O'Ryan <coryan@uci.edu> -// -// ============================================================================ - #ifndef ACE_RMCAST_UDP_EVENT_HANDLER_H #define ACE_RMCAST_UDP_EVENT_HANDLER_H #include "ace/pre.h" @@ -25,24 +14,41 @@ class ACE_RMCast_IO_UDP; class ACE_INET_Addr; +//! Implement an Adapter for the ACE_RMCast_IO_UDP class +/*! + * Applications may wish to use the ACE_Reactor to demultiplex I/O + * events for an ACE_RMCast_IO_UDP object. However other application + * may choose to make ACE_RMCast_IO_UDP active, or they may dedicate + * their own threads for its events. + * To avoid couplin ACE_RMCast_IO_UDP with the Reactor we don't make + * it derived from ACE_Event_Handler or any other class in the Reactor + * framework, instead, this simple Adapter can forward the Reactor + * messages to an ACE_RMCast_IO_UDP object. + */ class ACE_RMCast_Export ACE_RMCast_UDP_Event_Handler : public ACE_Event_Handler { public: - ACE_RMCast_UDP_Event_Handler (ACE_RMCast_IO_UDP *receiver); - // Constructor - + //! Constructor, save io_udp as the Adaptee in the Adapter pattern. + ACE_RMCast_UDP_Event_Handler (ACE_RMCast_IO_UDP *io_udp); + + //! Destructor + /*! + * Notice that this class does not own the ACE_RMCast_IO_UDP + * adaptee, so it does not destroy it. + */ ~ACE_RMCast_UDP_Event_Handler (void); - // Destructor - // = The Event_Handler methods + //@{ + //! Documented in ACE_Event_Handler class virtual ACE_HANDLE get_handle (void) const; virtual int handle_input (ACE_HANDLE fd = ACE_INVALID_HANDLE); virtual int handle_timeout (const ACE_Time_Value ¤t_time, const void *act = 0); + //@} private: + //! The adaptee ACE_RMCast_IO_UDP *io_udp_; - // The sender }; #if defined (__ACE_INLINE__) diff --git a/protocols/ace/RMCast/RMCast_UDP_Proxy.cpp b/protocols/ace/RMCast/RMCast_UDP_Proxy.cpp index 010267b0cbb..2eb0983b171 100644 --- a/protocols/ace/RMCast/RMCast_UDP_Proxy.cpp +++ b/protocols/ace/RMCast/RMCast_UDP_Proxy.cpp @@ -123,7 +123,7 @@ ACE_RMCast_UDP_Proxy::receive_message (char *buffer, size_t size) ACE_OS::memcpy (&tmp, buffer + 1, sizeof(tmp)); - ack.highest_in_sequence = ACE_NTOHL (tmp); + ack.next_expected = ACE_NTOHL (tmp); ACE_OS::memcpy (&tmp, buffer + 1 + sizeof(ACE_UINT32), sizeof(tmp)); ack.highest_received = ACE_NTOHL (tmp); @@ -175,4 +175,3 @@ ACE_RMCast_UDP_Proxy::reply_leave (ACE_RMCast::Leave &leave) { return this->io_udp_->send_leave (leave, this->peer_addr_); } - diff --git a/protocols/ace/RMCast/RMCast_Worker.cpp b/protocols/ace/RMCast/RMCast_Worker.cpp new file mode 100644 index 00000000000..06254b8c0f6 --- /dev/null +++ b/protocols/ace/RMCast/RMCast_Worker.cpp @@ -0,0 +1,19 @@ +// $Id$ + +#ifndef ACE_RMCAST_WORKER_CPP +#define ACE_RMCAST_WORKER_CPP + +#include "RMCast_Worker.h" + +#if ! defined (__ACE_INLINE__) +#include "RMCast_Worker.i" +#endif /* __ACE_INLINE__ */ + +ACE_RCSID(RMCast, RMCast_Worker, "$Id$") + +template<class KEY, class ITEM> +ACE_RMCast_Worker<KEY,ITEM>::~ACE_RMCast_Worker (void) +{ +} + +#endif /* ACE_RMCAST_WORKER_CPP */ diff --git a/protocols/ace/RMCast/RMCast_Worker.h b/protocols/ace/RMCast/RMCast_Worker.h new file mode 100644 index 00000000000..d3eb3032ebc --- /dev/null +++ b/protocols/ace/RMCast/RMCast_Worker.h @@ -0,0 +1,36 @@ +/* -*- C++ -*- */ +// $Id$ +// + +#ifndef ACE_RMCAST_WORKER_H +#define ACE_RMCAST_WORKER_H + +#include "ace/config-all.h" + +#if !defined (ACE_LACKS_PRAGMA_ONCE) +# pragma once +#endif /* ACE_LACKS_PRAGMA_ONCE */ + +template<class KEY, class ITEM> +class ACE_RMCast_Worker +{ +public: + virtual ~ACE_RMCast_Worker (void); + + virtual int work (KEY const & key, + ITEM const & item) = 0; +}; + +#if defined (__ACE_INLINE__) +#include "RMCast_Worker.i" +#endif /* __ACE_INLINE__ */ + +#if defined (ACE_TEMPLATES_REQUIRE_SOURCE) +#include "RMCast_Worker.cpp" +#endif /* ACE_TEMPLATES_REQUIRE_SOURCE */ + +#if defined (ACE_TEMPLATES_REQUIRE_PRAGMA) +#pragma implementation ("RMCast_Worker.cpp") +#endif /* ACE_TEMPLATES_REQUIRE_PRAGMA */ + +#endif /* ACE_RMCAST_WORKER_H */ diff --git a/protocols/ace/RMCast/RMCast_Worker.i b/protocols/ace/RMCast/RMCast_Worker.i new file mode 100644 index 00000000000..cfa1da318d3 --- /dev/null +++ b/protocols/ace/RMCast/RMCast_Worker.i @@ -0,0 +1 @@ +// $Id$ |