diff options
author | coryan <coryan@ae88bc3d-4319-0410-8dbf-d08b4c9d3795> | 2000-10-04 00:10:30 +0000 |
---|---|---|
committer | coryan <coryan@ae88bc3d-4319-0410-8dbf-d08b4c9d3795> | 2000-10-04 00:10:30 +0000 |
commit | 545131fddba871c7f71562c5234b92b1c6dc7df0 (patch) | |
tree | 9c3c15f5b079379f81cb5f7ba56feb809f264c0f /protocols/ace/RMCast | |
parent | b5011a72f7f744aab8a7001fb07bc1eeaa2faf03 (diff) | |
download | ATCD-545131fddba871c7f71562c5234b92b1c6dc7df0.tar.gz |
ChangeLogTag:Tue Oct 3 17:07:37 2000 Carlos O'Ryan <coryan@uci.edu>
Diffstat (limited to 'protocols/ace/RMCast')
-rw-r--r-- | protocols/ace/RMCast/Makefile | 98 | ||||
-rw-r--r-- | protocols/ace/RMCast/RMCast.h | 2 | ||||
-rw-r--r-- | protocols/ace/RMCast/RMCast_Copy_On_Write.cpp | 56 | ||||
-rw-r--r-- | protocols/ace/RMCast/RMCast_Copy_On_Write.h | 77 | ||||
-rw-r--r-- | protocols/ace/RMCast/RMCast_Copy_On_Write.i | 1 | ||||
-rw-r--r-- | protocols/ace/RMCast/RMCast_Reordering.cpp | 153 | ||||
-rw-r--r-- | protocols/ace/RMCast/RMCast_Reordering.h | 94 | ||||
-rw-r--r-- | protocols/ace/RMCast/RMCast_Reordering.i | 8 | ||||
-rw-r--r-- | protocols/ace/RMCast/RMCast_Retransmission.cpp | 19 |
9 files changed, 442 insertions, 66 deletions
diff --git a/protocols/ace/RMCast/Makefile b/protocols/ace/RMCast/Makefile index e97cd885493..868795fa82c 100644 --- a/protocols/ace/RMCast/Makefile +++ b/protocols/ace/RMCast/Makefile @@ -18,6 +18,7 @@ FILES= \ RMCast_Proxy \ RMCast_Membership \ RMCast_Retransmission \ + RMCast_Reordering \ \ RMCast_IO_UDP \ RMCast_UDP_Event_Handler \ @@ -710,6 +711,103 @@ include $(ACE_ROOT)/include/makeinclude/rules.local.GNU $(ACE_ROOT)/ace/Message_Block_T.i \ $(ACE_ROOT)/ace/Message_Block_T.cpp +.obj/RMCast_Reordering.o .obj/RMCast_Reordering.so .shobj/RMCast_Reordering.o .shobj/RMCast_Reordering.so: RMCast_Reordering.cpp RMCast_Reordering.h \ + $(ACE_ROOT)/ace/pre.h \ + RMCast_Module.h RMCast.h \ + $(ACE_ROOT)/ace/OS.h \ + $(ACE_ROOT)/ace/post.h \ + $(ACE_ROOT)/ace/ACE_export.h \ + $(ACE_ROOT)/ace/svc_export.h \ + $(ACE_ROOT)/ace/ace_wchar.h \ + $(ACE_ROOT)/ace/OS_Dirent.h \ + $(ACE_ROOT)/ace/OS_Export.h \ + $(ACE_ROOT)/ace/OS_Dirent.inl \ + $(ACE_ROOT)/ace/OS_String.h \ + $(ACE_ROOT)/ace/OS_String.inl \ + $(ACE_ROOT)/ace/OS_Memory.h \ + $(ACE_ROOT)/ace/OS_Memory.inl \ + $(ACE_ROOT)/ace/OS_TLI.h \ + $(ACE_ROOT)/ace/OS_TLI.inl \ + $(ACE_ROOT)/ace/Min_Max.h \ + $(ACE_ROOT)/ace/streams.h \ + $(ACE_ROOT)/ace/Basic_Types.h \ + $(ACE_ROOT)/ace/Basic_Types.i \ + $(ACE_ROOT)/ace/Trace.h \ + $(ACE_ROOT)/ace/OS.i \ + RMCast_Export.h RMCast.i RMCast_Module.i RMCast_Copy_On_Write.h \ + RMCast_Worker.h RMCast_Worker.i RMCast_Worker.cpp \ + $(ACE_ROOT)/ace/Synch.h \ + $(ACE_ROOT)/ace/ACE.h \ + $(ACE_ROOT)/ace/ACE.i \ + $(ACE_ROOT)/ace/Synch.i \ + $(ACE_ROOT)/ace/Synch_T.h \ + $(ACE_ROOT)/ace/Event_Handler.h \ + $(ACE_ROOT)/ace/Event_Handler.i \ + $(ACE_ROOT)/ace/Synch_T.i \ + $(ACE_ROOT)/ace/Thread.h \ + $(ACE_ROOT)/ace/Thread_Adapter.h \ + $(ACE_ROOT)/ace/Base_Thread_Adapter.h \ + $(ACE_ROOT)/ace/Base_Thread_Adapter.inl \ + $(ACE_ROOT)/ace/Thread_Adapter.inl \ + $(ACE_ROOT)/ace/Thread.i \ + $(ACE_ROOT)/ace/Atomic_Op.i \ + $(ACE_ROOT)/ace/Synch_T.cpp \ + $(ACE_ROOT)/ace/Log_Msg.h \ + $(ACE_ROOT)/ace/Log_Record.h \ + $(ACE_ROOT)/ace/Log_Priority.h \ + $(ACE_ROOT)/ace/Log_Record.i \ + RMCast_Copy_On_Write.i RMCast_Copy_On_Write.cpp \ + $(ACE_ROOT)/ace/RB_Tree.h \ + $(ACE_ROOT)/ace/Functor.h \ + $(ACE_ROOT)/ace/Functor.i \ + $(ACE_ROOT)/ace/Functor_T.h \ + $(ACE_ROOT)/ace/Functor_T.i \ + $(ACE_ROOT)/ace/Functor_T.cpp \ + $(ACE_ROOT)/ace/RB_Tree.i \ + $(ACE_ROOT)/ace/Malloc.h \ + $(ACE_ROOT)/ace/Malloc_Base.h \ + $(ACE_ROOT)/ace/Based_Pointer_T.h \ + $(ACE_ROOT)/ace/Based_Pointer_T.i \ + $(ACE_ROOT)/ace/Based_Pointer_T.cpp \ + $(ACE_ROOT)/ace/Based_Pointer_Repository.h \ + $(ACE_ROOT)/ace/Singleton.h \ + $(ACE_ROOT)/ace/Singleton.i \ + $(ACE_ROOT)/ace/Singleton.cpp \ + $(ACE_ROOT)/ace/Object_Manager.h \ + $(ACE_ROOT)/ace/Object_Manager.i \ + $(ACE_ROOT)/ace/Managed_Object.h \ + $(ACE_ROOT)/ace/Managed_Object.i \ + $(ACE_ROOT)/ace/Managed_Object.cpp \ + $(ACE_ROOT)/ace/Malloc.i \ + $(ACE_ROOT)/ace/Malloc_T.h \ + $(ACE_ROOT)/ace/Free_List.h \ + $(ACE_ROOT)/ace/Free_List.i \ + $(ACE_ROOT)/ace/Free_List.cpp \ + $(ACE_ROOT)/ace/Malloc_T.i \ + $(ACE_ROOT)/ace/Malloc_T.cpp \ + $(ACE_ROOT)/ace/Memory_Pool.h \ + $(ACE_ROOT)/ace/Signal.h \ + $(ACE_ROOT)/ace/Containers.h \ + $(ACE_ROOT)/ace/Containers.i \ + $(ACE_ROOT)/ace/Containers_T.h \ + $(ACE_ROOT)/ace/Containers_T.i \ + $(ACE_ROOT)/ace/Containers_T.cpp \ + $(ACE_ROOT)/ace/Signal.i \ + $(ACE_ROOT)/ace/Mem_Map.h \ + $(ACE_ROOT)/ace/Mem_Map.i \ + $(ACE_ROOT)/ace/SV_Semaphore_Complex.h \ + $(ACE_ROOT)/ace/SV_Semaphore_Simple.h \ + $(ACE_ROOT)/ace/SV_Semaphore_Simple.i \ + $(ACE_ROOT)/ace/SV_Semaphore_Complex.i \ + $(ACE_ROOT)/ace/Memory_Pool.i \ + $(ACE_ROOT)/ace/RB_Tree.cpp \ + RMCast_Reordering.i RMCast_Proxy.h RMCast_Proxy.i \ + $(ACE_ROOT)/ace/Message_Block.h \ + $(ACE_ROOT)/ace/Message_Block.i \ + $(ACE_ROOT)/ace/Message_Block_T.h \ + $(ACE_ROOT)/ace/Message_Block_T.i \ + $(ACE_ROOT)/ace/Message_Block_T.cpp + .obj/RMCast_IO_UDP.o .obj/RMCast_IO_UDP.so .shobj/RMCast_IO_UDP.o .shobj/RMCast_IO_UDP.so: RMCast_IO_UDP.cpp RMCast_IO_UDP.h \ $(ACE_ROOT)/ace/pre.h \ RMCast_Module.h RMCast.h \ diff --git a/protocols/ace/RMCast/RMCast.h b/protocols/ace/RMCast/RMCast.h index df3a0d48858..55e9b5d0368 100644 --- a/protocols/ace/RMCast/RMCast.h +++ b/protocols/ace/RMCast/RMCast.h @@ -207,7 +207,7 @@ public: */ struct Ack_Join { - ACE_INT32 next_sequence_number; + ACE_UINT32 next_sequence_number; //! Pass the proxy source between layers ACE_RMCast_Proxy *source; diff --git a/protocols/ace/RMCast/RMCast_Copy_On_Write.cpp b/protocols/ace/RMCast/RMCast_Copy_On_Write.cpp index f1553c7f4ab..20a76e559e0 100644 --- a/protocols/ace/RMCast/RMCast_Copy_On_Write.cpp +++ b/protocols/ace/RMCast/RMCast_Copy_On_Write.cpp @@ -39,11 +39,8 @@ ACE_RMCast_Copy_On_Write_Collection<COLLECTION,ITERATOR>::_decr_refcnt (void) template<class KEY, class ITEM, class COLLECTION, class ITERATOR> ACE_RMCast_Copy_On_Write<KEY,ITEM,COLLECTION,ITERATOR>:: ACE_RMCast_Copy_On_Write (void) - : pending_writes_ (0) - , writing_ (0) - , cond_ (mutex_) + : ACE_RMCast_Copy_On_Write_Container<COLLECTION,ITERATOR> () { - ACE_NEW (this->collection_, Collection); } template<class KEY, class ITEM, class COLLECTION, class ITERATOR> @@ -69,8 +66,10 @@ ACE_RMCast_Copy_On_Write<KEY,ITEM,COLLECTION,ITERATOR>:: for (ITERATOR i = ace_mon.collection->collection.begin (); i != end; ++i) { int r = worker->work ((*i).key (), (*i).item ()); - if (r != 0) - return r; + if (r == 1) + return 0; // Abort loop, but no error + if (r == -1) + return -1; } return 0; } @@ -79,11 +78,7 @@ template<class KEY, class ITEM, class C, class I> int ACE_RMCast_Copy_On_Write<KEY,ITEM,C,I>::bind (KEY const & k, ITEM const & i) { - Write_Guard ace_mon (this->mutex_, - this->cond_, - this->pending_writes_, - this->writing_, - this->collection_); + Write_Guard ace_mon (*this); return this->bind_i (ace_mon, k, i); } @@ -91,19 +86,15 @@ ACE_RMCast_Copy_On_Write<KEY,ITEM,C,I>::bind (KEY const & k, template<class KEY, class ITEM, class C, class I> int ACE_RMCast_Copy_On_Write<KEY,ITEM,C,I>::unbind (KEY const & k) { - Write_Guard ace_mon (this->mutex_, - this->cond_, - this->pending_writes_, - this->writing_, - this->collection_); + Write_Guard ace_mon (*this); return this->unbind_i (ace_mon, k); } template<class KEY, class ITEM, class C, class I> int ACE_RMCast_Copy_On_Write<KEY,ITEM,C,I>::bind_i (Write_Guard &ace_mon, - KEY const & k, - ITEM const & i) + KEY const & k, + ITEM const & i) { return ace_mon.copy->collection.bind (k, i); } @@ -118,18 +109,25 @@ ACE_RMCast_Copy_On_Write<KEY,ITEM,C,I>::unbind_i (Write_Guard &ace_mon, // **************************************************************** template<class COLLECTION, class ITERATOR> +ACE_RMCast_Copy_On_Write_Container<COLLECTION,ITERATOR>::ACE_RMCast_Copy_On_Write_Container (void) + : pending_writes_ (0) + , writing_ (0) + , cond_ (mutex_) +{ + ACE_NEW (this->collection_, Collection); +} + +// **************************************************************** + +template<class COLLECTION, class ITERATOR> ACE_RMCast_Copy_On_Write_Write_Guard<COLLECTION,ITERATOR>:: - ACE_RMCast_Copy_On_Write_Write_Guard (ACE_SYNCH_MUTEX &m, - ACE_SYNCH_CONDITION &c, - int &p, - int &w, - Collection*& cr) + ACE_RMCast_Copy_On_Write_Write_Guard (ACE_RMCast_Copy_On_Write_Container<COLLECTION,ITERATOR> &container) : copy (0) - , mutex (m) - , cond (c) - , pending_writes (p) - , writing_flag (w) - , collection (cr) + , mutex (container.mutex_) + , cond (container.cond_) + , pending_writes (container.pending_writes_) + , writing_flag (container.writing_) + , collection (container.collection_) { { ACE_GUARD (ACE_SYNCH_MUTEX, ace_mon, this->mutex); @@ -168,6 +166,8 @@ ACE_RMCast_Copy_On_Write_Write_Guard<COLLECTION,ITERATOR>:: this->cond.signal (); } // Delete outside the mutex, because it may take a long time. + // @@ Is this right? What happens if several readers are still + // using the old copy? tmp->_decr_refcnt (); } diff --git a/protocols/ace/RMCast/RMCast_Copy_On_Write.h b/protocols/ace/RMCast/RMCast_Copy_On_Write.h index 8724e23a5d5..e06aa5f1689 100644 --- a/protocols/ace/RMCast/RMCast_Copy_On_Write.h +++ b/protocols/ace/RMCast/RMCast_Copy_On_Write.h @@ -57,6 +57,47 @@ private: // **************************************************************** +template<class COLLECTION, class ITERATOR> +class ACE_RMCast_Copy_On_Write_Write_Guard; + +//! Base class for the Copy_On_Write collection, used to simplify the +//! declaration of the Write_Guard +template<class COLLECTION, class ITERATOR> +class ACE_RMCast_Copy_On_Write_Container +{ +public: + //! Constructor + ACE_RMCast_Copy_On_Write_Container (void); + + //! Let the Write_Guard access the internal fields. + friend ACE_RMCast_Copy_On_Write_Write_Guard<COLLECTION,ITERATOR>; + + //! A shorter name for the actual collection type + typedef ACE_RMCast_Copy_On_Write_Collection<COLLECTION,ITERATOR> Collection; + +protected: + //! Number of pending writes + int pending_writes_; + + //! If non-zero then a thread is changing the collection. + /*! + * Many threads can use the collection simulatenously, but only one + * change it. + */ + int writing_; + + //! A mutex to serialize access to the collection pointer. + ACE_SYNCH_MUTEX mutex_; + + //! A condition variable to wait to synchronize multiple writers. + ACE_SYNCH_CONDITION cond_; + + //! The collection, with reference counting added + Collection *collection_; +}; + +// **************************************************************** + //! Implement the write guard for a reference counted collecion /*! * This helper class atomically increments the reference count of a @@ -70,11 +111,7 @@ public: typedef ACE_RMCast_Copy_On_Write_Collection<COLLECTION,ITERATOR> Collection; //! Constructor - ACE_RMCast_Copy_On_Write_Write_Guard (ACE_SYNCH_MUTEX &mutex, - ACE_SYNCH_CONDITION &cond, - int &pending_writes, - int &writing_flag, - Collection*& collection); + ACE_RMCast_Copy_On_Write_Write_Guard (ACE_RMCast_Copy_On_Write_Container<COLLECTION,ITERATOR> &container); //! Destructor ~ACE_RMCast_Copy_On_Write_Write_Guard (void); @@ -100,11 +137,20 @@ private: Collection *&collection; }; + // **************************************************************** //! Implement a copy on write wrapper for a map-like collection +/* + * + * <B>WARNING: </B> This class may be moved away in the future, I'm + * investigating how it could be converted into a reusable component + * in ACE. I won't make promises on when will that happen, but I + * won't promise that it will stay here either. + * + */ template<class KEY, class ITEM, class COLLECTION, class ITERATOR> -class ACE_RMCast_Copy_On_Write +class ACE_RMCast_Copy_On_Write : public ACE_RMCast_Copy_On_Write_Container<COLLECTION,ITERATOR> { public: //! The Read_Guard trait @@ -136,25 +182,6 @@ public: //! Unbind assuming the Write_Guard is held int unbind_i (Write_Guard &guard, KEY const & key); - - //! Number of pending writes - int pending_writes_; - - //! If non-zero then a thread is changing the collection. - /*! - * Many threads can use the collection simulatenously, but only one - * change it. - */ - int writing_; - - //! A mutex to serialize access to the collection pointer. - ACE_SYNCH_MUTEX mutex_; - - //! A condition variable to wait to synchronize multiple writers. - ACE_SYNCH_CONDITION cond_; - - //! The collection, with reference counting added - Collection *collection_; }; #if defined (__ACE_INLINE__) diff --git a/protocols/ace/RMCast/RMCast_Copy_On_Write.i b/protocols/ace/RMCast/RMCast_Copy_On_Write.i index c6e5099cda5..354dd51bf5a 100644 --- a/protocols/ace/RMCast/RMCast_Copy_On_Write.i +++ b/protocols/ace/RMCast/RMCast_Copy_On_Write.i @@ -33,4 +33,3 @@ ACE_RMCast_Copy_On_Write_Read_Guard<COLLECTION,ITERATOR>:: } // **************************************************************** - diff --git a/protocols/ace/RMCast/RMCast_Reordering.cpp b/protocols/ace/RMCast/RMCast_Reordering.cpp new file mode 100644 index 00000000000..f63c6c66f34 --- /dev/null +++ b/protocols/ace/RMCast/RMCast_Reordering.cpp @@ -0,0 +1,153 @@ +// +// $Id$ +// + +#include "RMCast_Reordering.h" +#include "RMCast_Proxy.h" +#include "ace/Message_Block.h" + +#if !defined (__ACE_INLINE__) +# include "RMCast_Reordering.i" +#endif /* ! __ACE_INLINE__ */ + +ACE_RCSID(ace, RMCast_Reordering, "$Id$") + +ACE_RMCast_Reordering::~ACE_RMCast_Reordering (void) +{ +} + +int +ACE_RMCast_Reordering::close (void) +{ + // @@ + return 0; +} + +int +ACE_RMCast_Reordering::data (ACE_RMCast::Data &data) +{ + int must_ack = 0; + ACE_RMCast::Ack ack; + + // ACE_DEBUG ((LM_DEBUG, "Received message (%d)\n", data.sequence_number)); + { + ACE_GUARD_RETURN (ACE_SYNCH_MUTEX, ace_mon, this->mutex_, -1); + + if (data.sequence_number < this->next_expected_) + { + // Old message. Ack with the current status (look at the end + // of this block). + must_ack = 1; + + // ACE_DEBUG ((LM_DEBUG, ".... old message is ignored\n")); + } + + else if (data.sequence_number == this->next_expected_) + { + // ACE_DEBUG ((LM_DEBUG, ".... message is in order, received\n")); + + // Accept the message, the current thread will dispatch it, so + // it is marked as accepted (using the <next_expected> field). + // Any other thread will not push that message because now it + // is "old". + + this->next_expected_++; + + // Right message, process as many messages as possible from + // the queue, then ack the right level... + + // NOTE: we cannot release the mutex while dispatching + // events, otherwise: how do we stop other threads from + // delivering messages out of order? I.E. what if the + // next thread receives the next message? + if (this->next () != 0) + (void) this->next ()->data (data); + + // After delivering one message there may be more messages + // pending + this->push_queued_messages (); + + //@@ This should be strategized, for example, only Ack if + // there is a message out of order or something, otherwise + // continue with happiness. That works well for "optimistic + // models". + must_ack = 1; + } + + else + { + // ACE_DEBUG ((LM_DEBUG, ".... message out of sequence, saved\n")); + + // Out of sequence. + if (this->highest_received_ < data.sequence_number) + { + this->highest_received_ = data.sequence_number; + } + (void) this->messages_.bind (data.sequence_number, data); + // re-ack, otherwise save it and ack. + } + + ack.next_expected = this->next_expected_; + ack.highest_received = this->highest_received_; + } + + if (!must_ack || data.source == 0) + return 0; + return data.source->reply_ack (ack); +} + +int +ACE_RMCast_Reordering::ack_join (ACE_RMCast::Ack_Join &ack_join) +{ + { + ACE_GUARD_RETURN (ACE_SYNCH_MUTEX, ace_mon, this->mutex_, -1); + if (this->next_expected_ >= ack_join.next_sequence_number) + { + // Nothing to do in this case... + return 0; + } + + Messages_Iterator i = this->messages_.begin (); + Messages_Iterator end = this->messages_.end (); + + while (i != end + && (*i).key () < ack_join.next_sequence_number) + { + this->messages_.unbind ((*i).key ()); + i = this->messages_.begin (); + } + + this->next_expected_ = ack_join.next_sequence_number; + if (this->highest_received_ < ack_join.next_sequence_number) + this->highest_received_ = ack_join.next_sequence_number; + + this->push_queued_messages (); + } + + return 0; +} + +void +ACE_RMCast_Reordering::push_queued_messages (void) +{ + Messages_Iterator i = this->messages_.begin (); + Messages_Iterator end = this->messages_.end (); + + while (i != end + && (*i).key () == this->next_expected_) + { + if (this->next () != 0) + { + ACE_RMCast::Data data = (*i).item (); + this->next ()->data (data); + } + + this->messages_.unbind ((*i).key ()); + i = this->messages_.begin (); + this->next_expected_++; + } +} + +#if defined (ACE_HAS_EXPLICIT_TEMPLATE_INSTANTIATION) + +#endif /* ACE_HAS_EXPLICIT_TEMPLATE_INSTANTIATION */ diff --git a/protocols/ace/RMCast/RMCast_Reordering.h b/protocols/ace/RMCast/RMCast_Reordering.h new file mode 100644 index 00000000000..0f6c777913c --- /dev/null +++ b/protocols/ace/RMCast/RMCast_Reordering.h @@ -0,0 +1,94 @@ +/* -*- C++ -*- */ +// $Id$ + +// ============================================================================ +// +// = LIBRARY +// ace/RMCast +// +// = AUTHOR +// Carlos O'Ryan <coryan@uci.edu> +// +// ============================================================================ + +#ifndef ACE_RMCAST_REORDERING_H +#define ACE_RMCAST_REORDERING_H +#include "ace/pre.h" + +#include "RMCast_Module.h" +#include "ace/RB_Tree.h" +#include "ace/Synch.h" + +#if !defined (ACE_LACKS_PRAGMA_ONCE) +# pragma once +#endif /* ACE_LACKS_PRAGMA_ONCE */ + +class ACE_RMCast_Proxy; + +//! Pass messages up in sent order +/*! + * Some applications require receivers to process messages in the same + * order that messages are sent. This module buffers out of order + * messages and only delivers a message if: + * - All the previous messages have been delivered. + * - The sender sends a notification that previous messages will not + * be resent. + * + * The module also sends the Ack feedback to the sender. + * + * NOTE: This is not the same as causal or total ordering, that could + * be implemented someday, but requires a lot more than what we have + * right now. + * + */ +class ACE_RMCast_Export ACE_RMCast_Reordering : public ACE_RMCast_Module +{ +public: + //! Constructor + ACE_RMCast_Reordering (void); + + //! Destructor + virtual ~ACE_RMCast_Reordering (void); + + //! Use a Red-Black Tree to keep the queue of messages + typedef ACE_RB_Tree<ACE_UINT32,ACE_RMCast::Data,ACE_Less_Than<ACE_UINT32>,ACE_Null_Mutex> Messages; + typedef ACE_RB_Tree_Iterator<ACE_UINT32,ACE_RMCast::Data,ACE_Less_Than<ACE_UINT32>,ACE_Null_Mutex> Messages_Iterator; + + //! Remove messages still pending + virtual int close (void); + + //! Process a Data message. + /*! + * Process a Data message, sending the right Ack message back. + * The message is passed up only if it is in order. + */ + virtual int data (ACE_RMCast::Data &); + + //! During the join process the server informs us of the next + //! expected message + virtual int ack_join (ACE_RMCast::Ack_Join &); + +private: + //! Push any messages that are pending in the queue + void push_queued_messages (void); + +protected: + //! The reordering buffer + Messages messages_; + + //! The smallest value of \param next_expected for all the proxies + ACE_UINT32 next_expected_; + + //! The highest value of \param highest_received for all the proxies + ACE_UINT32 highest_received_; + + //! Synchronization + ACE_SYNCH_MUTEX mutex_; +}; + +#if defined (__ACE_INLINE__) +#include "RMCast_Reordering.i" +#endif /* __ACE_INLINE__ */ + +#include "ace/post.h" +#endif /* ACE_RMCAST_REORDERING_H */ diff --git a/protocols/ace/RMCast/RMCast_Reordering.i b/protocols/ace/RMCast/RMCast_Reordering.i new file mode 100644 index 00000000000..ccbf852bc67 --- /dev/null +++ b/protocols/ace/RMCast/RMCast_Reordering.i @@ -0,0 +1,8 @@ +// $Id$ + +ACE_INLINE +ACE_RMCast_Reordering::ACE_RMCast_Reordering (void) + : next_expected_ (0) + , highest_received_ (0) +{ +} diff --git a/protocols/ace/RMCast/RMCast_Retransmission.cpp b/protocols/ace/RMCast/RMCast_Retransmission.cpp index 7e38cdf7c97..4d40a59ac5e 100644 --- a/protocols/ace/RMCast/RMCast_Retransmission.cpp +++ b/protocols/ace/RMCast/RMCast_Retransmission.cpp @@ -33,9 +33,9 @@ public: { if (key > this->max_sequence_number_) return 0; - ACE_DEBUG ((LM_DEBUG, - " Retransmission::resend - message %d resent\n", - key)); + // ACE_DEBUG ((LM_DEBUG, + // " Retransmission::resend - message %d resent\n", + // key)); ACE_RMCast::Data data = item; int r = this->next_->data (data); if (r != 0) @@ -138,9 +138,9 @@ public: { if (key >= this->ack_.next_expected) return 0; - ACE_DEBUG ((LM_DEBUG, - " Retransmission::ack - message %d erased\n", - key)); + // ACE_DEBUG ((LM_DEBUG, + // " Retransmission::ack - message %d erased\n", + // key)); return this->messages_->unbind_i (this->ace_mon_, key); } @@ -159,11 +159,7 @@ private: int ACE_RMCast_Retransmission::ack (ACE_RMCast::Ack &ack) { - Messages::Write_Guard ace_mon (this->messages_.mutex_, - this->messages_.cond_, - this->messages_.pending_writes_, - this->messages_.writing_, - this->messages_.collection_); + Messages::Write_Guard ace_mon (this->messages_); ACE_RMCast_Ack_Worker worker (ack, ace_mon, &this->messages_); @@ -179,6 +175,7 @@ template class ACE_RB_Tree_Reverse_Iterator<ACE_UINT32,ACE_RMCast::Data,ACE_Less template class ACE_RB_Tree_Node<ACE_UINT32,ACE_RMCast::Data>; template class ACE_RMCast_Copy_On_Write<ACE_UINT32,ACE_RMCast::Data,ACE_RMCast_Retransmission::Collection,ACE_RMCast_Retransmission::Collection_Iterator>; +template class ACE_RMCast_Copy_On_Write_Container<ACE_RMCast_Retransmission::Collection,ACE_RMCast_Retransmission::Collection_Iterator>; template class ACE_RMCast_Copy_On_Write_Write_Guard<ACE_RMCast_Retransmission::Collection,ACE_RMCast_Retransmission::Collection_Iterator>; template class ACE_RMCast_Copy_On_Write_Read_Guard<ACE_RMCast_Retransmission::Collection,ACE_RMCast_Retransmission::Collection_Iterator>; template class ACE_RMCast_Copy_On_Write_Collection<ACE_RMCast_Retransmission::Collection,ACE_RMCast_Retransmission::Collection_Iterator>; |