summaryrefslogtreecommitdiff
path: root/protocols/ace
diff options
context:
space:
mode:
authorcoryan <coryan@ae88bc3d-4319-0410-8dbf-d08b4c9d3795>2000-10-04 00:10:30 +0000
committercoryan <coryan@ae88bc3d-4319-0410-8dbf-d08b4c9d3795>2000-10-04 00:10:30 +0000
commit545131fddba871c7f71562c5234b92b1c6dc7df0 (patch)
tree9c3c15f5b079379f81cb5f7ba56feb809f264c0f /protocols/ace
parentb5011a72f7f744aab8a7001fb07bc1eeaa2faf03 (diff)
downloadATCD-545131fddba871c7f71562c5234b92b1c6dc7df0.tar.gz
ChangeLogTag:Tue Oct 3 17:07:37 2000 Carlos O'Ryan <coryan@uci.edu>
Diffstat (limited to 'protocols/ace')
-rw-r--r--protocols/ace/RMCast/Makefile98
-rw-r--r--protocols/ace/RMCast/RMCast.h2
-rw-r--r--protocols/ace/RMCast/RMCast_Copy_On_Write.cpp56
-rw-r--r--protocols/ace/RMCast/RMCast_Copy_On_Write.h77
-rw-r--r--protocols/ace/RMCast/RMCast_Copy_On_Write.i1
-rw-r--r--protocols/ace/RMCast/RMCast_Reordering.cpp153
-rw-r--r--protocols/ace/RMCast/RMCast_Reordering.h94
-rw-r--r--protocols/ace/RMCast/RMCast_Reordering.i8
-rw-r--r--protocols/ace/RMCast/RMCast_Retransmission.cpp19
9 files changed, 442 insertions, 66 deletions
diff --git a/protocols/ace/RMCast/Makefile b/protocols/ace/RMCast/Makefile
index e97cd885493..868795fa82c 100644
--- a/protocols/ace/RMCast/Makefile
+++ b/protocols/ace/RMCast/Makefile
@@ -18,6 +18,7 @@ FILES= \
RMCast_Proxy \
RMCast_Membership \
RMCast_Retransmission \
+ RMCast_Reordering \
\
RMCast_IO_UDP \
RMCast_UDP_Event_Handler \
@@ -710,6 +711,103 @@ include $(ACE_ROOT)/include/makeinclude/rules.local.GNU
$(ACE_ROOT)/ace/Message_Block_T.i \
$(ACE_ROOT)/ace/Message_Block_T.cpp
+.obj/RMCast_Reordering.o .obj/RMCast_Reordering.so .shobj/RMCast_Reordering.o .shobj/RMCast_Reordering.so: RMCast_Reordering.cpp RMCast_Reordering.h \
+ $(ACE_ROOT)/ace/pre.h \
+ RMCast_Module.h RMCast.h \
+ $(ACE_ROOT)/ace/OS.h \
+ $(ACE_ROOT)/ace/post.h \
+ $(ACE_ROOT)/ace/ACE_export.h \
+ $(ACE_ROOT)/ace/svc_export.h \
+ $(ACE_ROOT)/ace/ace_wchar.h \
+ $(ACE_ROOT)/ace/OS_Dirent.h \
+ $(ACE_ROOT)/ace/OS_Export.h \
+ $(ACE_ROOT)/ace/OS_Dirent.inl \
+ $(ACE_ROOT)/ace/OS_String.h \
+ $(ACE_ROOT)/ace/OS_String.inl \
+ $(ACE_ROOT)/ace/OS_Memory.h \
+ $(ACE_ROOT)/ace/OS_Memory.inl \
+ $(ACE_ROOT)/ace/OS_TLI.h \
+ $(ACE_ROOT)/ace/OS_TLI.inl \
+ $(ACE_ROOT)/ace/Min_Max.h \
+ $(ACE_ROOT)/ace/streams.h \
+ $(ACE_ROOT)/ace/Basic_Types.h \
+ $(ACE_ROOT)/ace/Basic_Types.i \
+ $(ACE_ROOT)/ace/Trace.h \
+ $(ACE_ROOT)/ace/OS.i \
+ RMCast_Export.h RMCast.i RMCast_Module.i RMCast_Copy_On_Write.h \
+ RMCast_Worker.h RMCast_Worker.i RMCast_Worker.cpp \
+ $(ACE_ROOT)/ace/Synch.h \
+ $(ACE_ROOT)/ace/ACE.h \
+ $(ACE_ROOT)/ace/ACE.i \
+ $(ACE_ROOT)/ace/Synch.i \
+ $(ACE_ROOT)/ace/Synch_T.h \
+ $(ACE_ROOT)/ace/Event_Handler.h \
+ $(ACE_ROOT)/ace/Event_Handler.i \
+ $(ACE_ROOT)/ace/Synch_T.i \
+ $(ACE_ROOT)/ace/Thread.h \
+ $(ACE_ROOT)/ace/Thread_Adapter.h \
+ $(ACE_ROOT)/ace/Base_Thread_Adapter.h \
+ $(ACE_ROOT)/ace/Base_Thread_Adapter.inl \
+ $(ACE_ROOT)/ace/Thread_Adapter.inl \
+ $(ACE_ROOT)/ace/Thread.i \
+ $(ACE_ROOT)/ace/Atomic_Op.i \
+ $(ACE_ROOT)/ace/Synch_T.cpp \
+ $(ACE_ROOT)/ace/Log_Msg.h \
+ $(ACE_ROOT)/ace/Log_Record.h \
+ $(ACE_ROOT)/ace/Log_Priority.h \
+ $(ACE_ROOT)/ace/Log_Record.i \
+ RMCast_Copy_On_Write.i RMCast_Copy_On_Write.cpp \
+ $(ACE_ROOT)/ace/RB_Tree.h \
+ $(ACE_ROOT)/ace/Functor.h \
+ $(ACE_ROOT)/ace/Functor.i \
+ $(ACE_ROOT)/ace/Functor_T.h \
+ $(ACE_ROOT)/ace/Functor_T.i \
+ $(ACE_ROOT)/ace/Functor_T.cpp \
+ $(ACE_ROOT)/ace/RB_Tree.i \
+ $(ACE_ROOT)/ace/Malloc.h \
+ $(ACE_ROOT)/ace/Malloc_Base.h \
+ $(ACE_ROOT)/ace/Based_Pointer_T.h \
+ $(ACE_ROOT)/ace/Based_Pointer_T.i \
+ $(ACE_ROOT)/ace/Based_Pointer_T.cpp \
+ $(ACE_ROOT)/ace/Based_Pointer_Repository.h \
+ $(ACE_ROOT)/ace/Singleton.h \
+ $(ACE_ROOT)/ace/Singleton.i \
+ $(ACE_ROOT)/ace/Singleton.cpp \
+ $(ACE_ROOT)/ace/Object_Manager.h \
+ $(ACE_ROOT)/ace/Object_Manager.i \
+ $(ACE_ROOT)/ace/Managed_Object.h \
+ $(ACE_ROOT)/ace/Managed_Object.i \
+ $(ACE_ROOT)/ace/Managed_Object.cpp \
+ $(ACE_ROOT)/ace/Malloc.i \
+ $(ACE_ROOT)/ace/Malloc_T.h \
+ $(ACE_ROOT)/ace/Free_List.h \
+ $(ACE_ROOT)/ace/Free_List.i \
+ $(ACE_ROOT)/ace/Free_List.cpp \
+ $(ACE_ROOT)/ace/Malloc_T.i \
+ $(ACE_ROOT)/ace/Malloc_T.cpp \
+ $(ACE_ROOT)/ace/Memory_Pool.h \
+ $(ACE_ROOT)/ace/Signal.h \
+ $(ACE_ROOT)/ace/Containers.h \
+ $(ACE_ROOT)/ace/Containers.i \
+ $(ACE_ROOT)/ace/Containers_T.h \
+ $(ACE_ROOT)/ace/Containers_T.i \
+ $(ACE_ROOT)/ace/Containers_T.cpp \
+ $(ACE_ROOT)/ace/Signal.i \
+ $(ACE_ROOT)/ace/Mem_Map.h \
+ $(ACE_ROOT)/ace/Mem_Map.i \
+ $(ACE_ROOT)/ace/SV_Semaphore_Complex.h \
+ $(ACE_ROOT)/ace/SV_Semaphore_Simple.h \
+ $(ACE_ROOT)/ace/SV_Semaphore_Simple.i \
+ $(ACE_ROOT)/ace/SV_Semaphore_Complex.i \
+ $(ACE_ROOT)/ace/Memory_Pool.i \
+ $(ACE_ROOT)/ace/RB_Tree.cpp \
+ RMCast_Reordering.i RMCast_Proxy.h RMCast_Proxy.i \
+ $(ACE_ROOT)/ace/Message_Block.h \
+ $(ACE_ROOT)/ace/Message_Block.i \
+ $(ACE_ROOT)/ace/Message_Block_T.h \
+ $(ACE_ROOT)/ace/Message_Block_T.i \
+ $(ACE_ROOT)/ace/Message_Block_T.cpp
+
.obj/RMCast_IO_UDP.o .obj/RMCast_IO_UDP.so .shobj/RMCast_IO_UDP.o .shobj/RMCast_IO_UDP.so: RMCast_IO_UDP.cpp RMCast_IO_UDP.h \
$(ACE_ROOT)/ace/pre.h \
RMCast_Module.h RMCast.h \
diff --git a/protocols/ace/RMCast/RMCast.h b/protocols/ace/RMCast/RMCast.h
index df3a0d48858..55e9b5d0368 100644
--- a/protocols/ace/RMCast/RMCast.h
+++ b/protocols/ace/RMCast/RMCast.h
@@ -207,7 +207,7 @@ public:
*/
struct Ack_Join
{
- ACE_INT32 next_sequence_number;
+ ACE_UINT32 next_sequence_number;
//! Pass the proxy source between layers
ACE_RMCast_Proxy *source;
diff --git a/protocols/ace/RMCast/RMCast_Copy_On_Write.cpp b/protocols/ace/RMCast/RMCast_Copy_On_Write.cpp
index f1553c7f4ab..20a76e559e0 100644
--- a/protocols/ace/RMCast/RMCast_Copy_On_Write.cpp
+++ b/protocols/ace/RMCast/RMCast_Copy_On_Write.cpp
@@ -39,11 +39,8 @@ ACE_RMCast_Copy_On_Write_Collection<COLLECTION,ITERATOR>::_decr_refcnt (void)
template<class KEY, class ITEM, class COLLECTION, class ITERATOR>
ACE_RMCast_Copy_On_Write<KEY,ITEM,COLLECTION,ITERATOR>::
ACE_RMCast_Copy_On_Write (void)
- : pending_writes_ (0)
- , writing_ (0)
- , cond_ (mutex_)
+ : ACE_RMCast_Copy_On_Write_Container<COLLECTION,ITERATOR> ()
{
- ACE_NEW (this->collection_, Collection);
}
template<class KEY, class ITEM, class COLLECTION, class ITERATOR>
@@ -69,8 +66,10 @@ ACE_RMCast_Copy_On_Write<KEY,ITEM,COLLECTION,ITERATOR>::
for (ITERATOR i = ace_mon.collection->collection.begin (); i != end; ++i)
{
int r = worker->work ((*i).key (), (*i).item ());
- if (r != 0)
- return r;
+ if (r == 1)
+ return 0; // Abort loop, but no error
+ if (r == -1)
+ return -1;
}
return 0;
}
@@ -79,11 +78,7 @@ template<class KEY, class ITEM, class C, class I> int
ACE_RMCast_Copy_On_Write<KEY,ITEM,C,I>::bind (KEY const & k,
ITEM const & i)
{
- Write_Guard ace_mon (this->mutex_,
- this->cond_,
- this->pending_writes_,
- this->writing_,
- this->collection_);
+ Write_Guard ace_mon (*this);
return this->bind_i (ace_mon, k, i);
}
@@ -91,19 +86,15 @@ ACE_RMCast_Copy_On_Write<KEY,ITEM,C,I>::bind (KEY const & k,
template<class KEY, class ITEM, class C, class I> int
ACE_RMCast_Copy_On_Write<KEY,ITEM,C,I>::unbind (KEY const & k)
{
- Write_Guard ace_mon (this->mutex_,
- this->cond_,
- this->pending_writes_,
- this->writing_,
- this->collection_);
+ Write_Guard ace_mon (*this);
return this->unbind_i (ace_mon, k);
}
template<class KEY, class ITEM, class C, class I> int
ACE_RMCast_Copy_On_Write<KEY,ITEM,C,I>::bind_i (Write_Guard &ace_mon,
- KEY const & k,
- ITEM const & i)
+ KEY const & k,
+ ITEM const & i)
{
return ace_mon.copy->collection.bind (k, i);
}
@@ -118,18 +109,25 @@ ACE_RMCast_Copy_On_Write<KEY,ITEM,C,I>::unbind_i (Write_Guard &ace_mon,
// ****************************************************************
template<class COLLECTION, class ITERATOR>
+ACE_RMCast_Copy_On_Write_Container<COLLECTION,ITERATOR>::ACE_RMCast_Copy_On_Write_Container (void)
+ : pending_writes_ (0)
+ , writing_ (0)
+ , cond_ (mutex_)
+{
+ ACE_NEW (this->collection_, Collection);
+}
+
+// ****************************************************************
+
+template<class COLLECTION, class ITERATOR>
ACE_RMCast_Copy_On_Write_Write_Guard<COLLECTION,ITERATOR>::
- ACE_RMCast_Copy_On_Write_Write_Guard (ACE_SYNCH_MUTEX &m,
- ACE_SYNCH_CONDITION &c,
- int &p,
- int &w,
- Collection*& cr)
+ ACE_RMCast_Copy_On_Write_Write_Guard (ACE_RMCast_Copy_On_Write_Container<COLLECTION,ITERATOR> &container)
: copy (0)
- , mutex (m)
- , cond (c)
- , pending_writes (p)
- , writing_flag (w)
- , collection (cr)
+ , mutex (container.mutex_)
+ , cond (container.cond_)
+ , pending_writes (container.pending_writes_)
+ , writing_flag (container.writing_)
+ , collection (container.collection_)
{
{
ACE_GUARD (ACE_SYNCH_MUTEX, ace_mon, this->mutex);
@@ -168,6 +166,8 @@ ACE_RMCast_Copy_On_Write_Write_Guard<COLLECTION,ITERATOR>::
this->cond.signal ();
}
// Delete outside the mutex, because it may take a long time.
+ // @@ Is this right? What happens if several readers are still
+ // using the old copy?
tmp->_decr_refcnt ();
}
diff --git a/protocols/ace/RMCast/RMCast_Copy_On_Write.h b/protocols/ace/RMCast/RMCast_Copy_On_Write.h
index 8724e23a5d5..e06aa5f1689 100644
--- a/protocols/ace/RMCast/RMCast_Copy_On_Write.h
+++ b/protocols/ace/RMCast/RMCast_Copy_On_Write.h
@@ -57,6 +57,47 @@ private:
// ****************************************************************
+template<class COLLECTION, class ITERATOR>
+class ACE_RMCast_Copy_On_Write_Write_Guard;
+
+//! Base class for the Copy_On_Write collection, used to simplify the
+//! declaration of the Write_Guard
+template<class COLLECTION, class ITERATOR>
+class ACE_RMCast_Copy_On_Write_Container
+{
+public:
+ //! Constructor
+ ACE_RMCast_Copy_On_Write_Container (void);
+
+ //! Let the Write_Guard access the internal fields.
+ friend ACE_RMCast_Copy_On_Write_Write_Guard<COLLECTION,ITERATOR>;
+
+ //! A shorter name for the actual collection type
+ typedef ACE_RMCast_Copy_On_Write_Collection<COLLECTION,ITERATOR> Collection;
+
+protected:
+ //! Number of pending writes
+ int pending_writes_;
+
+ //! If non-zero then a thread is changing the collection.
+ /*!
+ * Many threads can use the collection simulatenously, but only one
+ * change it.
+ */
+ int writing_;
+
+ //! A mutex to serialize access to the collection pointer.
+ ACE_SYNCH_MUTEX mutex_;
+
+ //! A condition variable to wait to synchronize multiple writers.
+ ACE_SYNCH_CONDITION cond_;
+
+ //! The collection, with reference counting added
+ Collection *collection_;
+};
+
+// ****************************************************************
+
//! Implement the write guard for a reference counted collecion
/*!
* This helper class atomically increments the reference count of a
@@ -70,11 +111,7 @@ public:
typedef ACE_RMCast_Copy_On_Write_Collection<COLLECTION,ITERATOR> Collection;
//! Constructor
- ACE_RMCast_Copy_On_Write_Write_Guard (ACE_SYNCH_MUTEX &mutex,
- ACE_SYNCH_CONDITION &cond,
- int &pending_writes,
- int &writing_flag,
- Collection*& collection);
+ ACE_RMCast_Copy_On_Write_Write_Guard (ACE_RMCast_Copy_On_Write_Container<COLLECTION,ITERATOR> &container);
//! Destructor
~ACE_RMCast_Copy_On_Write_Write_Guard (void);
@@ -100,11 +137,20 @@ private:
Collection *&collection;
};
+
// ****************************************************************
//! Implement a copy on write wrapper for a map-like collection
+/*
+ *
+ * <B>WARNING: </B> This class may be moved away in the future, I'm
+ * investigating how it could be converted into a reusable component
+ * in ACE. I won't make promises on when will that happen, but I
+ * won't promise that it will stay here either.
+ *
+ */
template<class KEY, class ITEM, class COLLECTION, class ITERATOR>
-class ACE_RMCast_Copy_On_Write
+class ACE_RMCast_Copy_On_Write : public ACE_RMCast_Copy_On_Write_Container<COLLECTION,ITERATOR>
{
public:
//! The Read_Guard trait
@@ -136,25 +182,6 @@ public:
//! Unbind assuming the Write_Guard is held
int unbind_i (Write_Guard &guard, KEY const & key);
-
- //! Number of pending writes
- int pending_writes_;
-
- //! If non-zero then a thread is changing the collection.
- /*!
- * Many threads can use the collection simulatenously, but only one
- * change it.
- */
- int writing_;
-
- //! A mutex to serialize access to the collection pointer.
- ACE_SYNCH_MUTEX mutex_;
-
- //! A condition variable to wait to synchronize multiple writers.
- ACE_SYNCH_CONDITION cond_;
-
- //! The collection, with reference counting added
- Collection *collection_;
};
#if defined (__ACE_INLINE__)
diff --git a/protocols/ace/RMCast/RMCast_Copy_On_Write.i b/protocols/ace/RMCast/RMCast_Copy_On_Write.i
index c6e5099cda5..354dd51bf5a 100644
--- a/protocols/ace/RMCast/RMCast_Copy_On_Write.i
+++ b/protocols/ace/RMCast/RMCast_Copy_On_Write.i
@@ -33,4 +33,3 @@ ACE_RMCast_Copy_On_Write_Read_Guard<COLLECTION,ITERATOR>::
}
// ****************************************************************
-
diff --git a/protocols/ace/RMCast/RMCast_Reordering.cpp b/protocols/ace/RMCast/RMCast_Reordering.cpp
new file mode 100644
index 00000000000..f63c6c66f34
--- /dev/null
+++ b/protocols/ace/RMCast/RMCast_Reordering.cpp
@@ -0,0 +1,153 @@
+//
+// $Id$
+//
+
+#include "RMCast_Reordering.h"
+#include "RMCast_Proxy.h"
+#include "ace/Message_Block.h"
+
+#if !defined (__ACE_INLINE__)
+# include "RMCast_Reordering.i"
+#endif /* ! __ACE_INLINE__ */
+
+ACE_RCSID(ace, RMCast_Reordering, "$Id$")
+
+ACE_RMCast_Reordering::~ACE_RMCast_Reordering (void)
+{
+}
+
+int
+ACE_RMCast_Reordering::close (void)
+{
+ // @@
+ return 0;
+}
+
+int
+ACE_RMCast_Reordering::data (ACE_RMCast::Data &data)
+{
+ int must_ack = 0;
+ ACE_RMCast::Ack ack;
+
+ // ACE_DEBUG ((LM_DEBUG, "Received message (%d)\n", data.sequence_number));
+ {
+ ACE_GUARD_RETURN (ACE_SYNCH_MUTEX, ace_mon, this->mutex_, -1);
+
+ if (data.sequence_number < this->next_expected_)
+ {
+ // Old message. Ack with the current status (look at the end
+ // of this block).
+ must_ack = 1;
+
+ // ACE_DEBUG ((LM_DEBUG, ".... old message is ignored\n"));
+ }
+
+ else if (data.sequence_number == this->next_expected_)
+ {
+ // ACE_DEBUG ((LM_DEBUG, ".... message is in order, received\n"));
+
+ // Accept the message, the current thread will dispatch it, so
+ // it is marked as accepted (using the <next_expected> field).
+ // Any other thread will not push that message because now it
+ // is "old".
+
+ this->next_expected_++;
+
+ // Right message, process as many messages as possible from
+ // the queue, then ack the right level...
+
+ // NOTE: we cannot release the mutex while dispatching
+ // events, otherwise: how do we stop other threads from
+ // delivering messages out of order? I.E. what if the
+ // next thread receives the next message?
+ if (this->next () != 0)
+ (void) this->next ()->data (data);
+
+ // After delivering one message there may be more messages
+ // pending
+ this->push_queued_messages ();
+
+ //@@ This should be strategized, for example, only Ack if
+ // there is a message out of order or something, otherwise
+ // continue with happiness. That works well for "optimistic
+ // models".
+ must_ack = 1;
+ }
+
+ else
+ {
+ // ACE_DEBUG ((LM_DEBUG, ".... message out of sequence, saved\n"));
+
+ // Out of sequence.
+ if (this->highest_received_ < data.sequence_number)
+ {
+ this->highest_received_ = data.sequence_number;
+ }
+ (void) this->messages_.bind (data.sequence_number, data);
+ // re-ack, otherwise save it and ack.
+ }
+
+ ack.next_expected = this->next_expected_;
+ ack.highest_received = this->highest_received_;
+ }
+
+ if (!must_ack || data.source == 0)
+ return 0;
+ return data.source->reply_ack (ack);
+}
+
+int
+ACE_RMCast_Reordering::ack_join (ACE_RMCast::Ack_Join &ack_join)
+{
+ {
+ ACE_GUARD_RETURN (ACE_SYNCH_MUTEX, ace_mon, this->mutex_, -1);
+ if (this->next_expected_ >= ack_join.next_sequence_number)
+ {
+ // Nothing to do in this case...
+ return 0;
+ }
+
+ Messages_Iterator i = this->messages_.begin ();
+ Messages_Iterator end = this->messages_.end ();
+
+ while (i != end
+ && (*i).key () < ack_join.next_sequence_number)
+ {
+ this->messages_.unbind ((*i).key ());
+ i = this->messages_.begin ();
+ }
+
+ this->next_expected_ = ack_join.next_sequence_number;
+ if (this->highest_received_ < ack_join.next_sequence_number)
+ this->highest_received_ = ack_join.next_sequence_number;
+
+ this->push_queued_messages ();
+ }
+
+ return 0;
+}
+
+void
+ACE_RMCast_Reordering::push_queued_messages (void)
+{
+ Messages_Iterator i = this->messages_.begin ();
+ Messages_Iterator end = this->messages_.end ();
+
+ while (i != end
+ && (*i).key () == this->next_expected_)
+ {
+ if (this->next () != 0)
+ {
+ ACE_RMCast::Data data = (*i).item ();
+ this->next ()->data (data);
+ }
+
+ this->messages_.unbind ((*i).key ());
+ i = this->messages_.begin ();
+ this->next_expected_++;
+ }
+}
+
+#if defined (ACE_HAS_EXPLICIT_TEMPLATE_INSTANTIATION)
+
+#endif /* ACE_HAS_EXPLICIT_TEMPLATE_INSTANTIATION */
diff --git a/protocols/ace/RMCast/RMCast_Reordering.h b/protocols/ace/RMCast/RMCast_Reordering.h
new file mode 100644
index 00000000000..0f6c777913c
--- /dev/null
+++ b/protocols/ace/RMCast/RMCast_Reordering.h
@@ -0,0 +1,94 @@
+/* -*- C++ -*- */
+// $Id$
+
+// ============================================================================
+//
+// = LIBRARY
+// ace/RMCast
+//
+// = AUTHOR
+// Carlos O'Ryan <coryan@uci.edu>
+//
+// ============================================================================
+
+#ifndef ACE_RMCAST_REORDERING_H
+#define ACE_RMCAST_REORDERING_H
+#include "ace/pre.h"
+
+#include "RMCast_Module.h"
+#include "ace/RB_Tree.h"
+#include "ace/Synch.h"
+
+#if !defined (ACE_LACKS_PRAGMA_ONCE)
+# pragma once
+#endif /* ACE_LACKS_PRAGMA_ONCE */
+
+class ACE_RMCast_Proxy;
+
+//! Pass messages up in sent order
+/*!
+ * Some applications require receivers to process messages in the same
+ * order that messages are sent. This module buffers out of order
+ * messages and only delivers a message if:
+ * - All the previous messages have been delivered.
+ * - The sender sends a notification that previous messages will not
+ * be resent.
+ *
+ * The module also sends the Ack feedback to the sender.
+ *
+ * NOTE: This is not the same as causal or total ordering, that could
+ * be implemented someday, but requires a lot more than what we have
+ * right now.
+ *
+ */
+class ACE_RMCast_Export ACE_RMCast_Reordering : public ACE_RMCast_Module
+{
+public:
+ //! Constructor
+ ACE_RMCast_Reordering (void);
+
+ //! Destructor
+ virtual ~ACE_RMCast_Reordering (void);
+
+ //! Use a Red-Black Tree to keep the queue of messages
+ typedef ACE_RB_Tree<ACE_UINT32,ACE_RMCast::Data,ACE_Less_Than<ACE_UINT32>,ACE_Null_Mutex> Messages;
+ typedef ACE_RB_Tree_Iterator<ACE_UINT32,ACE_RMCast::Data,ACE_Less_Than<ACE_UINT32>,ACE_Null_Mutex> Messages_Iterator;
+
+ //! Remove messages still pending
+ virtual int close (void);
+
+ //! Process a Data message.
+ /*!
+ * Process a Data message, sending the right Ack message back.
+ * The message is passed up only if it is in order.
+ */
+ virtual int data (ACE_RMCast::Data &);
+
+ //! During the join process the server informs us of the next
+ //! expected message
+ virtual int ack_join (ACE_RMCast::Ack_Join &);
+
+private:
+ //! Push any messages that are pending in the queue
+ void push_queued_messages (void);
+
+protected:
+ //! The reordering buffer
+ Messages messages_;
+
+ //! The smallest value of \param next_expected for all the proxies
+ ACE_UINT32 next_expected_;
+
+ //! The highest value of \param highest_received for all the proxies
+ ACE_UINT32 highest_received_;
+
+ //! Synchronization
+ ACE_SYNCH_MUTEX mutex_;
+};
+
+#if defined (__ACE_INLINE__)
+#include "RMCast_Reordering.i"
+#endif /* __ACE_INLINE__ */
+
+#include "ace/post.h"
+#endif /* ACE_RMCAST_REORDERING_H */
diff --git a/protocols/ace/RMCast/RMCast_Reordering.i b/protocols/ace/RMCast/RMCast_Reordering.i
new file mode 100644
index 00000000000..ccbf852bc67
--- /dev/null
+++ b/protocols/ace/RMCast/RMCast_Reordering.i
@@ -0,0 +1,8 @@
+// $Id$
+
+ACE_INLINE
+ACE_RMCast_Reordering::ACE_RMCast_Reordering (void)
+ : next_expected_ (0)
+ , highest_received_ (0)
+{
+}
diff --git a/protocols/ace/RMCast/RMCast_Retransmission.cpp b/protocols/ace/RMCast/RMCast_Retransmission.cpp
index 7e38cdf7c97..4d40a59ac5e 100644
--- a/protocols/ace/RMCast/RMCast_Retransmission.cpp
+++ b/protocols/ace/RMCast/RMCast_Retransmission.cpp
@@ -33,9 +33,9 @@ public:
{
if (key > this->max_sequence_number_)
return 0;
- ACE_DEBUG ((LM_DEBUG,
- " Retransmission::resend - message %d resent\n",
- key));
+ // ACE_DEBUG ((LM_DEBUG,
+ // " Retransmission::resend - message %d resent\n",
+ // key));
ACE_RMCast::Data data = item;
int r = this->next_->data (data);
if (r != 0)
@@ -138,9 +138,9 @@ public:
{
if (key >= this->ack_.next_expected)
return 0;
- ACE_DEBUG ((LM_DEBUG,
- " Retransmission::ack - message %d erased\n",
- key));
+ // ACE_DEBUG ((LM_DEBUG,
+ // " Retransmission::ack - message %d erased\n",
+ // key));
return this->messages_->unbind_i (this->ace_mon_, key);
}
@@ -159,11 +159,7 @@ private:
int
ACE_RMCast_Retransmission::ack (ACE_RMCast::Ack &ack)
{
- Messages::Write_Guard ace_mon (this->messages_.mutex_,
- this->messages_.cond_,
- this->messages_.pending_writes_,
- this->messages_.writing_,
- this->messages_.collection_);
+ Messages::Write_Guard ace_mon (this->messages_);
ACE_RMCast_Ack_Worker worker (ack, ace_mon, &this->messages_);
@@ -179,6 +175,7 @@ template class ACE_RB_Tree_Reverse_Iterator<ACE_UINT32,ACE_RMCast::Data,ACE_Less
template class ACE_RB_Tree_Node<ACE_UINT32,ACE_RMCast::Data>;
template class ACE_RMCast_Copy_On_Write<ACE_UINT32,ACE_RMCast::Data,ACE_RMCast_Retransmission::Collection,ACE_RMCast_Retransmission::Collection_Iterator>;
+template class ACE_RMCast_Copy_On_Write_Container<ACE_RMCast_Retransmission::Collection,ACE_RMCast_Retransmission::Collection_Iterator>;
template class ACE_RMCast_Copy_On_Write_Write_Guard<ACE_RMCast_Retransmission::Collection,ACE_RMCast_Retransmission::Collection_Iterator>;
template class ACE_RMCast_Copy_On_Write_Read_Guard<ACE_RMCast_Retransmission::Collection,ACE_RMCast_Retransmission::Collection_Iterator>;
template class ACE_RMCast_Copy_On_Write_Collection<ACE_RMCast_Retransmission::Collection,ACE_RMCast_Retransmission::Collection_Iterator>;