summaryrefslogtreecommitdiff
path: root/ace/RMCast
diff options
context:
space:
mode:
Diffstat (limited to 'ace/RMCast')
-rw-r--r--ace/RMCast/Makefile20
-rw-r--r--ace/RMCast/RMCast.h15
-rw-r--r--ace/RMCast/RMCast_Copy_On_Write.cpp176
-rw-r--r--ace/RMCast/RMCast_Copy_On_Write.h173
-rw-r--r--ace/RMCast/RMCast_Copy_On_Write.i36
-rw-r--r--ace/RMCast/RMCast_Fragment.h40
-rw-r--r--ace/RMCast/RMCast_IO_UDP.cpp2
-rw-r--r--ace/RMCast/RMCast_IO_UDP.h65
-rw-r--r--ace/RMCast/RMCast_Membership.cpp24
-rw-r--r--ace/RMCast/RMCast_Membership.h46
-rw-r--r--ace/RMCast/RMCast_Membership.i2
-rw-r--r--ace/RMCast/RMCast_Module.h32
-rw-r--r--ace/RMCast/RMCast_Module_Factory.h31
-rw-r--r--ace/RMCast/RMCast_Partial_Message.h40
-rw-r--r--ace/RMCast/RMCast_Proxy.cpp6
-rw-r--r--ace/RMCast/RMCast_Proxy.h27
-rw-r--r--ace/RMCast/RMCast_Proxy.i2
-rw-r--r--ace/RMCast/RMCast_Retransmission.cpp127
-rw-r--r--ace/RMCast/RMCast_Retransmission.h67
-rw-r--r--ace/RMCast/RMCast_UDP_Event_Handler.h40
-rw-r--r--ace/RMCast/RMCast_UDP_Proxy.cpp3
-rw-r--r--ace/RMCast/RMCast_Worker.cpp19
-rw-r--r--ace/RMCast/RMCast_Worker.h36
-rw-r--r--ace/RMCast/RMCast_Worker.i1
24 files changed, 852 insertions, 178 deletions
diff --git a/ace/RMCast/Makefile b/ace/RMCast/Makefile
index ae13792c4c4..e97cd885493 100644
--- a/ace/RMCast/Makefile
+++ b/ace/RMCast/Makefile
@@ -636,17 +636,11 @@ include $(ACE_ROOT)/include/makeinclude/rules.local.GNU
$(ACE_ROOT)/ace/Basic_Types.i \
$(ACE_ROOT)/ace/Trace.h \
$(ACE_ROOT)/ace/OS.i \
- RMCast_Export.h RMCast.i RMCast_Module.i \
- $(ACE_ROOT)/ace/RB_Tree.h \
- $(ACE_ROOT)/ace/Functor.h \
+ RMCast_Export.h RMCast.i RMCast_Module.i RMCast_Copy_On_Write.h \
+ RMCast_Worker.h RMCast_Worker.i RMCast_Worker.cpp \
+ $(ACE_ROOT)/ace/Synch.h \
$(ACE_ROOT)/ace/ACE.h \
$(ACE_ROOT)/ace/ACE.i \
- $(ACE_ROOT)/ace/Functor.i \
- $(ACE_ROOT)/ace/Functor_T.h \
- $(ACE_ROOT)/ace/Functor_T.i \
- $(ACE_ROOT)/ace/Functor_T.cpp \
- $(ACE_ROOT)/ace/RB_Tree.i \
- $(ACE_ROOT)/ace/Synch.h \
$(ACE_ROOT)/ace/Synch.i \
$(ACE_ROOT)/ace/Synch_T.h \
$(ACE_ROOT)/ace/Event_Handler.h \
@@ -664,6 +658,14 @@ include $(ACE_ROOT)/include/makeinclude/rules.local.GNU
$(ACE_ROOT)/ace/Log_Record.h \
$(ACE_ROOT)/ace/Log_Priority.h \
$(ACE_ROOT)/ace/Log_Record.i \
+ RMCast_Copy_On_Write.i RMCast_Copy_On_Write.cpp \
+ $(ACE_ROOT)/ace/RB_Tree.h \
+ $(ACE_ROOT)/ace/Functor.h \
+ $(ACE_ROOT)/ace/Functor.i \
+ $(ACE_ROOT)/ace/Functor_T.h \
+ $(ACE_ROOT)/ace/Functor_T.i \
+ $(ACE_ROOT)/ace/Functor_T.cpp \
+ $(ACE_ROOT)/ace/RB_Tree.i \
$(ACE_ROOT)/ace/Malloc.h \
$(ACE_ROOT)/ace/Malloc_Base.h \
$(ACE_ROOT)/ace/Based_Pointer_T.h \
diff --git a/ace/RMCast/RMCast.h b/ace/RMCast/RMCast.h
index abf2a24e946..df3a0d48858 100644
--- a/ace/RMCast/RMCast.h
+++ b/ace/RMCast/RMCast.h
@@ -233,16 +233,19 @@ public:
*
* This message is used to provide feedback information to senders.
* It contains two sequence numbers:
- * - highest_in_sequence: is the sequence number of the last message
- * received without any lost messages before it
- * - highest_received: is the sequence number of the last_message
- * successfully received, there may be some messages lost before it
+ * - \param next_expected: is the sequence number of the next message
+ * expected, i.e. (next_expected-1) is the last message received
+ * without any losses before it.
+ * - \param highest_received: is the highest sequence number among
+ * all the messages successfully received.
+ * In other words, all messages lost (if any) are in the range:
+ * [next_expected,highest_received)
*
* <CODE>
* +---------+----------------------+<BR>
* | 8 bits | MT_ACK |<BR>
* +---------+----------------------+<BR>
- * | 32 bits | highest_in_sequence |<BR>
+ * | 32 bits | next_expected |<BR>
* +---------+----------------------+<BR>
* | 32 bits | highest_received |<BR>
* +---------+----------------------+<BR>
@@ -251,7 +254,7 @@ public:
struct Ack
{
//! The last message received without any losses before it.
- ACE_UINT32 highest_in_sequence;
+ ACE_UINT32 next_expected;
//! The last message successfully received
ACE_UINT32 highest_received;
diff --git a/ace/RMCast/RMCast_Copy_On_Write.cpp b/ace/RMCast/RMCast_Copy_On_Write.cpp
new file mode 100644
index 00000000000..f1553c7f4ab
--- /dev/null
+++ b/ace/RMCast/RMCast_Copy_On_Write.cpp
@@ -0,0 +1,176 @@
+// $Id$
+
+#ifndef ACE_RMCAST_COPY_ON_WRITE_CPP
+#define ACE_RMCAST_COPY_ON_WRITE_CPP
+
+#include "RMCast_Copy_On_Write.h"
+
+#if ! defined (__ACE_INLINE__)
+#include "RMCast_Copy_On_Write.i"
+#endif /* __ACE_INLINE__ */
+
+ACE_RCSID(RMCast, RMCast_Copy_On_Write, "$Id$")
+
+template<class COLLECTION, class ITERATOR> void
+ACE_RMCast_Copy_On_Write_Collection<COLLECTION,ITERATOR>::_incr_refcnt (void)
+{
+ // LOCKING: no locking is required, the caller grabs the mutex.
+ this->refcount_++;
+}
+
+template<class COLLECTION, class ITERATOR> void
+ACE_RMCast_Copy_On_Write_Collection<COLLECTION,ITERATOR>::_decr_refcnt (void)
+{
+ // LOCKING: no locking is required, the caller grabs the mutex.
+ {
+ this->refcount_--;
+ if (this->refcount_ != 0)
+ return;
+ }
+ //@@ TODO: If this wrapper is going to be completely general some
+ // kind of functor has to be provided to remove the elements in the
+ // collection, in case the are no self-managed
+
+ delete this;
+}
+
+// ****************************************************************
+
+template<class KEY, class ITEM, class COLLECTION, class ITERATOR>
+ACE_RMCast_Copy_On_Write<KEY,ITEM,COLLECTION,ITERATOR>::
+ ACE_RMCast_Copy_On_Write (void)
+ : pending_writes_ (0)
+ , writing_ (0)
+ , cond_ (mutex_)
+{
+ ACE_NEW (this->collection_, Collection);
+}
+
+template<class KEY, class ITEM, class COLLECTION, class ITERATOR>
+ACE_RMCast_Copy_On_Write<KEY,ITEM,COLLECTION,ITERATOR>::
+ ~ACE_RMCast_Copy_On_Write (void)
+{
+ ACE_GUARD (ACE_SYNCH_MUTEX, ace_mon, this->mutex_);
+
+ while (this->pending_writes_ != 0)
+ this->cond_.wait ();
+
+ this->collection_->_decr_refcnt ();
+ this->collection_ = 0;
+}
+
+template<class KEY, class ITEM, class COLLECTION, class ITERATOR> int
+ACE_RMCast_Copy_On_Write<KEY,ITEM,COLLECTION,ITERATOR>::
+ for_each (ACE_RMCast_Worker<KEY,ITEM> *worker)
+{
+ Read_Guard ace_mon (this->mutex_, this->collection_);
+
+ ITERATOR end = ace_mon.collection->collection.end ();
+ for (ITERATOR i = ace_mon.collection->collection.begin (); i != end; ++i)
+ {
+ int r = worker->work ((*i).key (), (*i).item ());
+ if (r != 0)
+ return r;
+ }
+ return 0;
+}
+
+template<class KEY, class ITEM, class C, class I> int
+ACE_RMCast_Copy_On_Write<KEY,ITEM,C,I>::bind (KEY const & k,
+ ITEM const & i)
+{
+ Write_Guard ace_mon (this->mutex_,
+ this->cond_,
+ this->pending_writes_,
+ this->writing_,
+ this->collection_);
+
+ return this->bind_i (ace_mon, k, i);
+}
+
+template<class KEY, class ITEM, class C, class I> int
+ACE_RMCast_Copy_On_Write<KEY,ITEM,C,I>::unbind (KEY const & k)
+{
+ Write_Guard ace_mon (this->mutex_,
+ this->cond_,
+ this->pending_writes_,
+ this->writing_,
+ this->collection_);
+
+ return this->unbind_i (ace_mon, k);
+}
+
+template<class KEY, class ITEM, class C, class I> int
+ACE_RMCast_Copy_On_Write<KEY,ITEM,C,I>::bind_i (Write_Guard &ace_mon,
+ KEY const & k,
+ ITEM const & i)
+{
+ return ace_mon.copy->collection.bind (k, i);
+}
+
+template<class KEY, class ITEM, class C, class I> int
+ACE_RMCast_Copy_On_Write<KEY,ITEM,C,I>::unbind_i (Write_Guard &ace_mon,
+ KEY const & k)
+{
+ return ace_mon.copy->collection.unbind (k);
+}
+
+// ****************************************************************
+
+template<class COLLECTION, class ITERATOR>
+ACE_RMCast_Copy_On_Write_Write_Guard<COLLECTION,ITERATOR>::
+ ACE_RMCast_Copy_On_Write_Write_Guard (ACE_SYNCH_MUTEX &m,
+ ACE_SYNCH_CONDITION &c,
+ int &p,
+ int &w,
+ Collection*& cr)
+ : copy (0)
+ , mutex (m)
+ , cond (c)
+ , pending_writes (p)
+ , writing_flag (w)
+ , collection (cr)
+{
+ {
+ ACE_GUARD (ACE_SYNCH_MUTEX, ace_mon, this->mutex);
+
+ this->pending_writes++;
+
+ while (this->writing_flag != 0)
+ this->cond.wait ();
+
+ this->writing_flag = 1;
+ }
+
+ // Copy outside the mutex, because it may take a long time.
+ // Nobody can change it, because it is protected by the
+ // writing_flag.
+
+ // First initialize it (with the correct reference count
+ ACE_NEW (this->copy, Collection);
+ // Copy the contents
+ this->copy->collection = this->collection->collection;
+}
+
+template<class COLLECTION, class ITERATOR>
+ACE_RMCast_Copy_On_Write_Write_Guard<COLLECTION,ITERATOR>::
+ ~ACE_RMCast_Copy_On_Write_Write_Guard (void)
+{
+ Collection *tmp = 0;
+ {
+ ACE_GUARD (ACE_SYNCH_MUTEX, ace_mon, this->mutex);
+
+ tmp = this->collection;
+ this->collection = this->copy;
+ this->writing_flag = 0;
+ this->pending_writes--;
+
+ this->cond.signal ();
+ }
+ // Delete outside the mutex, because it may take a long time.
+ tmp->_decr_refcnt ();
+}
+
+// ****************************************************************
+
+#endif /* ACE_RMCAST_COPY_ON_WRITE_CPP */
diff --git a/ace/RMCast/RMCast_Copy_On_Write.h b/ace/RMCast/RMCast_Copy_On_Write.h
new file mode 100644
index 00000000000..8724e23a5d5
--- /dev/null
+++ b/ace/RMCast/RMCast_Copy_On_Write.h
@@ -0,0 +1,173 @@
+/* -*- C++ -*- */
+// $Id$
+//
+
+#ifndef ACE_RMCAST_COPY_ON_WRITE_H
+#define ACE_RMCAST_COPY_ON_WRITE_H
+#include "ace/pre.h"
+
+#include "RMCast_Worker.h"
+#include "ace/Synch.h"
+
+//! A wrapper to implement reference counted collections
+template<class COLLECTION, class ITERATOR>
+class ACE_RMCast_Copy_On_Write_Collection
+{
+public:
+ //! Constructor
+ ACE_RMCast_Copy_On_Write_Collection (void);
+
+ //! Increment the reference count
+ void _incr_refcnt (void);
+
+ //! Decrement the reference count
+ void _decr_refcnt (void);
+
+ //! The actual collection
+ COLLECTION collection;
+
+private:
+ //! The reference count
+ ACE_UINT32 refcount_;
+};
+
+// ****************************************************************
+
+//! Implement a read guard for a reference counted collection
+template<class COLLECTION, class ITERATOR>
+class ACE_RMCast_Copy_On_Write_Read_Guard
+{
+public:
+ typedef ACE_RMCast_Copy_On_Write_Collection<COLLECTION,ITERATOR> Collection;
+
+ //! Constructor
+ ACE_RMCast_Copy_On_Write_Read_Guard (ACE_SYNCH_MUTEX &mutex,
+ Collection *&collection);
+
+ //! Destructor
+ ~ACE_RMCast_Copy_On_Write_Read_Guard (void);
+
+ //! A reference to the collection
+ Collection *collection;
+
+private:
+ //! Synchronization
+ ACE_SYNCH_MUTEX &mutex_;
+};
+
+// ****************************************************************
+
+//! Implement the write guard for a reference counted collecion
+/*!
+ * This helper class atomically increments the reference count of a
+ * ACE_RMCast_Copy_On_Write_Collection and reads the current
+ * collection in the Copy_On_Write class.
+ */
+template<class COLLECTION, class ITERATOR>
+class ACE_RMCast_Copy_On_Write_Write_Guard
+{
+public:
+ typedef ACE_RMCast_Copy_On_Write_Collection<COLLECTION,ITERATOR> Collection;
+
+ //! Constructor
+ ACE_RMCast_Copy_On_Write_Write_Guard (ACE_SYNCH_MUTEX &mutex,
+ ACE_SYNCH_CONDITION &cond,
+ int &pending_writes,
+ int &writing_flag,
+ Collection*& collection);
+
+ //! Destructor
+ ~ACE_RMCast_Copy_On_Write_Write_Guard (void);
+
+ //! The collection
+ Collection *copy;
+
+private:
+ //! Keep a reference to the mutex
+ ACE_SYNCH_MUTEX &mutex;
+
+ //! Keep a reference to the condition variable
+ ACE_SYNCH_CONDITION &cond;
+
+ //! Use a reference to update the pending writes count
+ int &pending_writes;
+
+ //! Use a reference to update the writing flag
+ int &writing_flag;
+
+ //! Use this reference to update the collection once the
+ //! modifications are finished.
+ Collection *&collection;
+};
+
+// ****************************************************************
+
+//! Implement a copy on write wrapper for a map-like collection
+template<class KEY, class ITEM, class COLLECTION, class ITERATOR>
+class ACE_RMCast_Copy_On_Write
+{
+public:
+ //! The Read_Guard trait
+ typedef ACE_RMCast_Copy_On_Write_Read_Guard<COLLECTION,ITERATOR> Read_Guard;
+
+ //! The Write_Guard trait
+ typedef ACE_RMCast_Copy_On_Write_Write_Guard<COLLECTION,ITERATOR> Write_Guard;
+
+ //! The underlying collection type
+ typedef ACE_RMCast_Copy_On_Write_Collection<COLLECTION,ITERATOR> Collection;
+
+ //! Constructor
+ ACE_RMCast_Copy_On_Write (void);
+
+ //! Destructor
+ ~ACE_RMCast_Copy_On_Write (void);
+
+ //! Iterate over all the elements invoking \param worker on each one.
+ int for_each (ACE_RMCast_Worker<KEY,ITEM> *worker);
+
+ //! Add a new element
+ int bind (KEY const & key, ITEM const & item);
+
+ //! Remove an element
+ int unbind (KEY const & key);
+
+ //! Bind assuming the Write_Guard is held
+ int bind_i (Write_Guard &guard, KEY const & key, ITEM const & item);
+
+ //! Unbind assuming the Write_Guard is held
+ int unbind_i (Write_Guard &guard, KEY const & key);
+
+ //! Number of pending writes
+ int pending_writes_;
+
+ //! If non-zero then a thread is changing the collection.
+ /*!
+ * Many threads can use the collection simulatenously, but only one
+ * change it.
+ */
+ int writing_;
+
+ //! A mutex to serialize access to the collection pointer.
+ ACE_SYNCH_MUTEX mutex_;
+
+ //! A condition variable to wait to synchronize multiple writers.
+ ACE_SYNCH_CONDITION cond_;
+
+ //! The collection, with reference counting added
+ Collection *collection_;
+};
+
+#if defined (__ACE_INLINE__)
+#include "RMCast_Copy_On_Write.i"
+#endif /* __ACE_INLINE__ */
+
+#if defined (ACE_TEMPLATES_REQUIRE_SOURCE)
+#include "RMCast_Copy_On_Write.cpp"
+#endif /* ACE_TEMPLATES_REQUIRE_SOURCE */
+
+#if defined (ACE_TEMPLATES_REQUIRE_PRAGMA)
+#pragma implementation ("RMCast_Copy_On_Write.cpp")
+#endif /* ACE_TEMPLATES_REQUIRE_PRAGMA */
+
+#include "ace/post.h"
+#endif /* ACE_RMCAST_COPY_ON_WRITE_H */
diff --git a/ace/RMCast/RMCast_Copy_On_Write.i b/ace/RMCast/RMCast_Copy_On_Write.i
new file mode 100644
index 00000000000..c6e5099cda5
--- /dev/null
+++ b/ace/RMCast/RMCast_Copy_On_Write.i
@@ -0,0 +1,36 @@
+// $Id$
+
+template<class COLLECTION, class ITERATOR> ACE_INLINE
+ACE_RMCast_Copy_On_Write_Collection<COLLECTION,ITERATOR>::
+ ACE_RMCast_Copy_On_Write_Collection (void)
+ : refcount_ (1)
+{
+}
+
+// ****************************************************************
+
+template<class COLLECTION, class ITERATOR> ACE_INLINE
+ACE_RMCast_Copy_On_Write_Read_Guard<COLLECTION,ITERATOR>::
+ ACE_RMCast_Copy_On_Write_Read_Guard (ACE_SYNCH_MUTEX &m,
+ Collection*& collection_ref)
+ : collection (0)
+ , mutex_ (m)
+{
+ ACE_GUARD (ACE_SYNCH_MUTEX, ace_mon, this->mutex_);
+ this->collection = collection_ref;
+ this->collection->_incr_refcnt ();
+}
+
+template<class COLLECTION, class ITERATOR> ACE_INLINE
+ACE_RMCast_Copy_On_Write_Read_Guard<COLLECTION,ITERATOR>::
+ ~ACE_RMCast_Copy_On_Write_Read_Guard (void)
+{
+ if (this->collection != 0)
+ {
+ ACE_GUARD (ACE_SYNCH_MUTEX, ace_mon, this->mutex_);
+ this->collection->_decr_refcnt ();
+ }
+}
+
+// ****************************************************************
+
diff --git a/ace/RMCast/RMCast_Fragment.h b/ace/RMCast/RMCast_Fragment.h
index 7b64d763ebc..eed08c92517 100644
--- a/ace/RMCast/RMCast_Fragment.h
+++ b/ace/RMCast/RMCast_Fragment.h
@@ -1,15 +1,5 @@
// $Id$
-// ============================================================================
-//
-// = DESCRIPTION
-// The fragmentation task for the reliable multicast library
-//
-// = AUTHOR
-// Carlos O'Ryan <coryan@uci.edu>
-//
-// ============================================================================
-
#ifndef ACE_RMCAST_FRAGMENT_H
#define ACE_RMCAST_FRAGMENT_H
#include "ace/pre.h"
@@ -21,28 +11,46 @@
# pragma once
#endif /* ACE_LACKS_PRAGMA_ONCE */
+//! Default fragment size
#ifndef ACE_RMCAST_DEFAULT_FRAGMENT_SIZE
# define ACE_RMCAST_DEFAULT_FRAGMENT_SIZE 1024
#endif /* ACE_RMCAST_DEFAULT_FRAGMENT_SIZE */
+//! Fragmentation module
+/*!
+ * Some transports cannot send very big messages, for example UDP
+ * imposes a limit of 64K, and in practice the limit is even more
+ * strict than that.
+ * This class decomposes a message into multiple fragments, using an
+ * application defined maximum size.
+ */
class ACE_RMCast_Export ACE_RMCast_Fragment : public ACE_RMCast_Module
{
public:
+ //! Constructor
ACE_RMCast_Fragment (void);
- // Constructor
+ //! Destructor
virtual ~ACE_RMCast_Fragment (void);
- // Destructor
+ //! Accessor for the max_fragment size.
+ /*! There is no modifier, the maximum fragment size is obtained
+ * using feedback from the lower layers (transport?)
+ * @@TODO We have not implemented the feedback mechanisms yet!
+ */
size_t max_fragment_size (void) const;
- // Accessor for the max_fragment size.
- // There is no modifier, the maximum fragment size is obtained using
- // feedback from the lower layer (transport?)
- // = The ACE_RMCast_Module methods
+ /*!
+ * Only data messages need fragmentation, the control messages are
+ * all small enough for all the transports that I know about.
+ * Well, actually for CAN-Bus (Controller Area Network), they may be
+ * too big, because the max payload there is 8 bytes, but we don't
+ * play with those in ACE.
+ */
virtual int data (ACE_RMCast::Data &data);
private:
+ //! Current fragment size limit
size_t max_fragment_size_;
};
diff --git a/ace/RMCast/RMCast_IO_UDP.cpp b/ace/RMCast/RMCast_IO_UDP.cpp
index af655f3130f..421982d5ad6 100644
--- a/ace/RMCast/RMCast_IO_UDP.cpp
+++ b/ace/RMCast/RMCast_IO_UDP.cpp
@@ -354,7 +354,7 @@ ACE_RMCast_IO_UDP::send_ack (ACE_RMCast::Ack &ack,
char header[16];
header[0] = ACE_RMCast::MT_ACK;
- ACE_UINT32 tmp = ACE_HTONL (ack.highest_in_sequence);
+ ACE_UINT32 tmp = ACE_HTONL (ack.next_expected);
ACE_OS::memcpy (header + 1,
&tmp, sizeof(ACE_UINT32));
tmp = ACE_HTONL (ack.highest_received);
diff --git a/ace/RMCast/RMCast_IO_UDP.h b/ace/RMCast/RMCast_IO_UDP.h
index bdcccabe6e1..5af403bf994 100644
--- a/ace/RMCast/RMCast_IO_UDP.h
+++ b/ace/RMCast/RMCast_IO_UDP.h
@@ -33,44 +33,65 @@ class ACE_Time_Value;
class ACE_RMCast_Export ACE_RMCast_IO_UDP : public ACE_RMCast_Module
{
public:
+ //! Constructor
+ /*!
+ * The <factory> argument is used to create the modules for each
+ * proxy that process incoming messages. The class does *not* assume
+ * ownership of <factory>, the caller owns it. But it does assume
+ * ownership of the modules returned by the factory, and it may ask
+ * the factory to release them eventually.
+ */
ACE_RMCast_IO_UDP (ACE_RMCast_Module_Factory *factory);
- // Constructor
- // <factory> is used to create the modules for each proxy that
- // process incoming messages. The class does *not* assume ownership
- // of <factory>, the caller owns it.
+ //! Destructor
~ACE_RMCast_IO_UDP (void);
- // Destructor
+ //! Join a new multicast group
+ /*!
+ * Start receiving data for the <mcast_addr> multicast group.
+ * Please read the documentation of ACE_SOCK_Dgram_Mcast for more
+ * details.
+ */
int subscribe (const ACE_INET_Addr &mcast_addr,
int reuse_addr = 1,
const ACE_TCHAR *net_if = 0,
int protocol_family = PF_INET,
int protocol = 0);
- // Start receiving data for the <mcast_addr> multicast group.
- // Please read the documentation of <ACE_SOCK_Dgram_Mcast> for more
- // details.
// The class can be used with a Reactor or using blocking I/O
// depending on what method of the following two is called.
+ //! Wait for events for the period <tv>. If <tv> is zero it blocks
+ //! forever.
int handle_events (ACE_Time_Value *tv = 0);
- // Wait for events for the period <tv>. If <tv> is zero it blocks
- // forever.
+ //! Register any event handlers into <reactor>
+ /*!
+ * @@TODO: This should be left for the clients of the class, there
+ * is no reason why this class must know about reactors.
+ */
int register_handlers (ACE_Reactor *reactor);
- // Register any event handlers into <reactor>
+ //! Remove all the handlers from the reactor
+ /*!
+ * @@TODO: This should be left for the clients of the class, there
+ * is no reason why this class must know about reactors.
+ */
int remove_handlers (void);
- // Remove all the handlers from the reactor
+ //! There is data to read, read it and process it.
int handle_input (ACE_HANDLE h);
- // There is data to read, read it and process it.
+ //! Obtain the handle for the underlying socket
ACE_HANDLE get_handle (void) const;
- // Obtain the handle for the underlying socket
- // Send back to the remove object represented by <proxy>
+ //@{
+ //! Send the message to the ACE_INET_Addr argument.
+ /*!
+ * These methods are used in the implementation of the
+ * ACE_RMCast_UDP_Proxy objects and the implementation of the
+ * inherited ACE_RMCast_Module methods in this class.
+ */
int send_data (ACE_RMCast::Data &, const ACE_INET_Addr &);
int send_poll (ACE_RMCast::Poll &, const ACE_INET_Addr &);
int send_ack_join (ACE_RMCast::Ack_Join &, const ACE_INET_Addr &);
@@ -78,8 +99,9 @@ public:
int send_ack (ACE_RMCast::Ack &, const ACE_INET_Addr &);
int send_join (ACE_RMCast::Join &, const ACE_INET_Addr &);
int send_leave (ACE_RMCast::Leave &, const ACE_INET_Addr &);
+ //@}
- // = The RMCast_Module methods
+ // Please read the documentation in ACE_RMCast_Module for more details
virtual int data (ACE_RMCast::Data &);
virtual int poll (ACE_RMCast::Poll &);
virtual int ack_join (ACE_RMCast::Ack_Join &);
@@ -87,23 +109,24 @@ public:
virtual int ack (ACE_RMCast::Ack &);
virtual int join (ACE_RMCast::Join &);
virtual int leave (ACE_RMCast::Leave &);
- // The messages are sent to the multicast group
private:
+ //! The factory used to create the modules attached to each proxy
ACE_RMCast_Module_Factory *factory_;
- // The factory used to create the modules attached to each proxy
+ //! The multicast group we subscribe and send to
ACE_INET_Addr mcast_group_;
- // The multicast group we subscribe and send to
+ //! The socket used to receive and send data
ACE_SOCK_Dgram_Mcast dgram_;
- // The socket
+ //! Use a Hash_Map to maintain the collection of proxies
typedef ACE_Hash_Map_Manager<ACE_INET_Addr,ACE_RMCast_UDP_Proxy*,ACE_Null_Mutex> Map;
+ //! The collection of proxies
Map map_;
+ //! The event handler adapter
ACE_RMCast_UDP_Event_Handler eh_;
- // The event handler adapter
};
#if defined (__ACE_INLINE__)
diff --git a/ace/RMCast/RMCast_Membership.cpp b/ace/RMCast/RMCast_Membership.cpp
index 6ee2690a41f..a23d7a756e5 100644
--- a/ace/RMCast/RMCast_Membership.cpp
+++ b/ace/RMCast/RMCast_Membership.cpp
@@ -28,14 +28,14 @@ ACE_RMCast_Membership::ack (ACE_RMCast::Ack &ack)
ACE_RMCast::Ack next_ack;
{
ACE_GUARD_RETURN (ACE_SYNCH_MUTEX, ace_mon, this->mutex_, -1);
- if (ack.highest_in_sequence < this->highest_in_sequence_)
+ if (ack.next_expected < this->next_expected_)
{
// @@ This violates an invariant of the class, shouldn't
// happen...
// ACE_DEBUG ((LM_DEBUG, "ACE_RMCast_Membership::ack[3]\n"));
return -1;
}
- else if (ack.highest_in_sequence == this->highest_in_sequence_)
+ else if (ack.next_expected == this->next_expected_)
{
// Nothing new, just continue....
// ACE_DEBUG ((LM_DEBUG, "ACE_RMCast_Membership::ack[4]\n"));
@@ -43,21 +43,23 @@ ACE_RMCast_Membership::ack (ACE_RMCast::Ack &ack)
}
// Possible update, re-evaluate the story...
- ACE_UINT32 highest_in_sequence = (*i)->highest_in_sequence ();
+ ACE_UINT32 next_expected = (*i)->next_expected ();
ACE_UINT32 highest_received = (*i)->highest_received ();
++i;
for (; i != end; ++i)
{
- ACE_UINT32 s = (*i)->highest_in_sequence ();
- if (s < highest_in_sequence)
- highest_in_sequence = s;
+ ACE_UINT32 s = (*i)->next_expected ();
+ if (s < next_expected)
+ next_expected = s;
ACE_UINT32 r = (*i)->highest_received ();
if (r > highest_received)
highest_received = r;
}
#if 0
- if (this->highest_in_sequence_ >= highest_in_sequence
+ // @@TODO: this is an important feature, disabled until it is
+ // fully debugged
+ if (this->next_expected_ >= next_expected
|| this->highest_received_ >= highest_received)
{
// No change....
@@ -65,12 +67,12 @@ ACE_RMCast_Membership::ack (ACE_RMCast::Ack &ack)
return 0;
}
#endif /* 0 */
- this->highest_in_sequence_ = highest_in_sequence;
+ this->next_expected_ = next_expected;
this->highest_received_ = highest_received;
if (this->next () == 0)
return 0;
next_ack.source = ack.source;
- next_ack.highest_in_sequence = this->highest_in_sequence_;
+ next_ack.next_expected = this->next_expected_;
next_ack.highest_received = this->highest_received_;
}
// @@ This looks like a race condition, next() is checked inside the
@@ -89,6 +91,8 @@ ACE_RMCast_Membership::join (ACE_RMCast::Join &join)
ACE_GUARD_RETURN (ACE_SYNCH_MUTEX, ace_mon, this->mutex_, -1);
if (this->proxies_.insert (join.source) == -1)
return -1;
+ // @@TODO: This may change the next Ack to send up, should
+ // recompute and send the right message if that was the case.
}
return this->ACE_RMCast_Module::join (join);
@@ -103,6 +107,8 @@ ACE_RMCast_Membership::leave (ACE_RMCast::Leave &leave)
{
ACE_GUARD_RETURN (ACE_SYNCH_MUTEX, ace_mon, this->mutex_, -1);
(void) this->proxies_.remove (leave.source);
+ // @@TODO: This may change the next Ack to send up, should
+ // recompute and send the right message if that was the case.
}
return this->ACE_RMCast_Module::leave (leave);
diff --git a/ace/RMCast/RMCast_Membership.h b/ace/RMCast/RMCast_Membership.h
index a99a7752507..21ee1bea97b 100644
--- a/ace/RMCast/RMCast_Membership.h
+++ b/ace/RMCast/RMCast_Membership.h
@@ -28,41 +28,59 @@
class ACE_RMCast_Proxy;
+//! Track peer membership
+/*!
+ * Reliable senders of events need to know exactly how many peers are
+ * receiving the events, and how many events has each peer received so
+ * far.
+ * This class uses the Join, Leave and Ack messages to build that
+ * information, it also summarizes the Ack events and propagate only
+ * the global info to the upper layer.
+ */
class ACE_RMCast_Export ACE_RMCast_Membership : public ACE_RMCast_Module
{
- // = TITLE
- // Track Receiver membership
- //
- // = DESCRIPTION
- // Define the interface for all reliable multicast membership
public:
- // = Initialization and termination methods.
+ //! Constructor
ACE_RMCast_Membership (void);
- // Constructor
+ //! Destructor
virtual ~ACE_RMCast_Membership (void);
- // Destructor
- // = The RMCast_Module methods
+ //! Receive an process an Ack message
+ /*!
+ * After receiving the Ack message we find out what is the lowest
+ * sequence number received in order among all the acks received by
+ * the proxies in the collection. We also find out what is the
+ * highest sequence number received by any proxy.
+ * We only propagate that information back to the upper layer, and
+ * then only if there are any news since the last Ack.
+ */
virtual int ack (ACE_RMCast::Ack &);
+
+ //! Add a new member to the collection, using the <source> field in
+ //! the Join message
virtual int join (ACE_RMCast::Join &);
+
+ //! Remove a member from the collection, using the <source> field in
+ //! the Join message
virtual int leave (ACE_RMCast::Leave &);
protected:
+ //! Use an unbounded set to maintain the collection of proxies.
typedef ACE_Unbounded_Set<ACE_RMCast_Proxy*> Proxy_Collection;
typedef ACE_Unbounded_Set_Iterator<ACE_RMCast_Proxy*> Proxy_Iterator;
+ //! The collection of proxies
Proxy_Collection proxies_;
- // The membership buffer
- ACE_UINT32 highest_in_sequence_;
- // The smallest value of <highest_in_sequence> for all the proxies
+ //! The smallest value of \param next_expected for all the proxies
+ ACE_UINT32 next_expected_;
+ //! The highest value of \param highest_received for all the proxies
ACE_UINT32 highest_received_;
- // The highest value of <highest_received> for all the proxies
+ //! Synchronization
ACE_SYNCH_MUTEX mutex_;
- // Synchronization
};
#if defined (__ACE_INLINE__)
diff --git a/ace/RMCast/RMCast_Membership.i b/ace/RMCast/RMCast_Membership.i
index 0c3e33c2d01..b513c2d5141 100644
--- a/ace/RMCast/RMCast_Membership.i
+++ b/ace/RMCast/RMCast_Membership.i
@@ -2,7 +2,7 @@
ACE_INLINE
ACE_RMCast_Membership::ACE_RMCast_Membership (void)
- : highest_in_sequence_ (0)
+ : next_expected_ (0)
, highest_received_ (0)
{
}
diff --git a/ace/RMCast/RMCast_Module.h b/ace/RMCast/RMCast_Module.h
index dc4077fa4ab..fad76caac53 100644
--- a/ace/RMCast/RMCast_Module.h
+++ b/ace/RMCast/RMCast_Module.h
@@ -36,55 +36,55 @@ class ACE_Time_Value;
class ACE_RMCast_Export ACE_RMCast_Module
{
public:
- // = Initialization and termination methods.
+ //! Constructor
ACE_RMCast_Module (void);
- //!< Constructor
+ //! Destructor
virtual ~ACE_RMCast_Module (void);
- //!< Destructor
+ //! Modifier for the next element in the stack
virtual int next (ACE_RMCast_Module *next);
- //!< Modifier for the next element in the stack
+ //! Accesor for the next element in the stack
virtual ACE_RMCast_Module* next (void) const;
- //!< Accesor for the next element in the stack
+ //! Modifier for the previous element in the stack
virtual int prev (ACE_RMCast_Module *prev);
- //!< Modifier for the previous element in the stack
+ //! Accesor for the previous element in the stack
virtual ACE_RMCast_Module* prev (void) const;
- //!< Accesor for the previous element in the stack
+ //! Initialize the module, setting up the next module
virtual int open (void);
- //!< Initialize the module, setting up the next module
+ //! Close the module.
virtual int close (void);
- //!< Close the module.
+ //! Push data through the stack
virtual int data (ACE_RMCast::Data &);
- //!< Push data through the stack
+ //! Push a polling request through the stack
virtual int poll (ACE_RMCast::Poll &);
- //!< Push a polling request through the stack
+ //! Push a message to ack a join request through the stack
virtual int ack_join (ACE_RMCast::Ack_Join &);
- //!< Push a message to ack a join request through the stack
+ //! Push a message to ack a leave request through the stack
virtual int ack_leave (ACE_RMCast::Ack_Leave &);
- //!< Push a message to ack a leave request through the stack
+ //! Push an ack mesage through the stack
virtual int ack (ACE_RMCast::Ack &);
- //!< Push an ack mesage through the stack
+ //! Push a join message through the stack
virtual int join (ACE_RMCast::Join &);
- //!< Push a join message through the stack
+ //! Push a leave message through the stack
virtual int leave (ACE_RMCast::Leave &);
- //!< Push a leave message through the stack
private:
//! The next element in the stack
ACE_RMCast_Module *next_;
+
//! The previous element in the stack
ACE_RMCast_Module *prev_;
};
diff --git a/ace/RMCast/RMCast_Module_Factory.h b/ace/RMCast/RMCast_Module_Factory.h
index 722ad87d678..f0ea58df0e5 100644
--- a/ace/RMCast/RMCast_Module_Factory.h
+++ b/ace/RMCast/RMCast_Module_Factory.h
@@ -27,19 +27,40 @@
class ACE_RMCast_Module;
class ACE_RMCast_IO_UDP;
+//! Create Module stacks
+/*!
+ * Different application will probably require different
+ * configurations in their Module stack, some will just want best
+ * effort semantics. Others will use Reliable communication with a
+ * maximum retransmission time. Furthermore, applications may want to
+ * receive messages in send order, or just as soon as they are
+ * received.
+ * Obviously most applications will want to change want happens once a
+ * message is completely received.
+ *
+ * To achieve all this flexibility the IO layer uses this factory to
+ * create the full stack of Modules corresponding to a single
+ * consumer.
+ * To keep the complexity under control the intention is to create
+ * helper Factories, such as Reliable_Module_Factory where
+ * applications only need to customize a few features.
+ */
class ACE_RMCast_Export ACE_RMCast_Module_Factory
{
- // = DESCRIPTION
- //
public:
+ //! Destructor
virtual ~ACE_RMCast_Module_Factory (void);
- // Destructor
+ //! Create a new proxy
virtual ACE_RMCast_Module *create (ACE_RMCast_IO_UDP *) = 0;
- // Create a new proxy
+ //! Destroy a proxy
+ /*!
+ * Some factories may allocate modules from a pool, or return the
+ * same module for all proxies. Consequently, only the factory
+ * knows how to destroy them.
+ */
virtual void destroy (ACE_RMCast_Module *) = 0;
- // Destroy a proxy
};
#if defined (__ACE_INLINE__)
diff --git a/ace/RMCast/RMCast_Partial_Message.h b/ace/RMCast/RMCast_Partial_Message.h
index 9b71eb4a541..88fa9ab2f1a 100644
--- a/ace/RMCast/RMCast_Partial_Message.h
+++ b/ace/RMCast/RMCast_Partial_Message.h
@@ -26,44 +26,72 @@
#define ACE_RMCAST_DEFAULT_HOLE_COUNT 16
#endif /* ACE_RMCAST_DEFAULT_HOLE_COUNT */
+//! Represent a partially received message in the
+//! ACE_RMCast_Reassembly module
+/*!
+ * This class provides temporary storage for the fragments as they are
+ * received in the ACE_RMCast_Reassembly module. It also keeps track
+ * of what portions of the message are still missing.
+ */
class ACE_RMCast_Export ACE_RMCast_Partial_Message
{
public:
+ //! Constructor, reserve enough memory for the complete message
ACE_RMCast_Partial_Message (ACE_UINT32 message_size);
+
+ //! Destructor
~ACE_RMCast_Partial_Message (void);
+ //! Process a fragment
+ /*!
+ * A fragment starting at <offset> has been received, copy the
+ * fragment contents and update the list of holes.
+ */
int fragment_received (ACE_UINT32 message_size,
ACE_UINT32 offset,
ACE_Message_Block *mb);
+
+ //! Return 1 if the message is complete
int is_complete (void) const;
+ //! Return the body of the message, the memory is *not* owned by the
+ //! caller
ACE_Message_Block *message_body (void);
- // Return the body of the message, the memory is owned by the
- // class.
private:
+ //! Insert a new hole into the list
+ /*!
+ * The class keeps an array to represent the missing portions of the
+ * message. This method inserts a new hole, i.e. a new element in
+ * the array at index <i>. The <start> and <end> arguments represent
+ * the offsets of the missing portion of the message.
+ */
int insert_hole (size_t i,
ACE_UINT32 start,
ACE_UINT32 end);
- // Insert a new hole into the list
+ //! Remove a hole from the list
int remove_hole (size_t i);
- // Remove a hole from the list
private:
+ //! Maintain the message storage
ACE_Message_Block message_body_;
- // Used to rebuild the body of the message
+ //! Represent a missing portion of a message
struct Hole
{
+ //! Offset where the missing portion of the message starts
ACE_UINT32 start;
+ //! Offset where the missing portion of the message ends
ACE_UINT32 end;
};
+ //! Implement a growing array of Hole structures
+ //@{
Hole *hole_list_;
size_t max_hole_count_;
size_t hole_count_;
- // The current list of holes in the message_body.
+ //@}
};
#if defined (__ACE_INLINE__)
diff --git a/ace/RMCast/RMCast_Proxy.cpp b/ace/RMCast/RMCast_Proxy.cpp
index 53d9d0b6726..f6b2bbec5e5 100644
--- a/ace/RMCast/RMCast_Proxy.cpp
+++ b/ace/RMCast/RMCast_Proxy.cpp
@@ -15,9 +15,9 @@ ACE_RMCast_Proxy::~ACE_RMCast_Proxy (void)
}
ACE_UINT32
-ACE_RMCast_Proxy::highest_in_sequence (void) const
+ACE_RMCast_Proxy::next_expected (void) const
{
- return this->highest_in_sequence_;
+ return this->next_expected_;
}
ACE_UINT32
@@ -29,7 +29,7 @@ ACE_RMCast_Proxy::highest_received (void) const
int
ACE_RMCast_Proxy::ack (ACE_RMCast::Ack &ack)
{
- this->highest_in_sequence_ = ack.highest_in_sequence;
+ this->next_expected_ = ack.next_expected;
this->highest_received_ = ack.highest_received;
return this->ACE_RMCast_Module::ack (ack);
}
diff --git a/ace/RMCast/RMCast_Proxy.h b/ace/RMCast/RMCast_Proxy.h
index 414b74174fb..e0e6afe79b1 100644
--- a/ace/RMCast/RMCast_Proxy.h
+++ b/ace/RMCast/RMCast_Proxy.h
@@ -48,27 +48,28 @@ public:
//! Destructor
virtual ~ACE_RMCast_Proxy (void);
-
- //! Return the highest sequence number received without any losses
- //! before it. Only applies to remote receiver proxies.
+
+ //! Return the next sequence number expected by the peer. Only
+ //! applies to remote receiver proxies.
/*!
- Please read the documentation in ACE_RMCast::Ack
+ * Please read the documentation in ACE_RMCast::Ack
*/
- virtual ACE_UINT32 highest_in_sequence (void) const;
+ virtual ACE_UINT32 next_expected (void) const;
//! Return the highest sequence number successfully received.
//! Only applies to remote receiver proxies.
/*!
- Please read the documentation in ACE_RMCast::Ack
+ * Please read the documentation in ACE_RMCast::Ack
*/
virtual ACE_UINT32 highest_received (void) const;
//@{
//! Send messages directly to the peer.
- /*! Send a message directly to the peer, i.e. the message is not
- sent through the multicast group and it may not be processed by
- all the layers in the stack.
- */
+ /*!
+ * Send a message directly to the peer, i.e. the message is not
+ * sent through the multicast group and it may not be processed by
+ * all the layers in the stack.
+ */
virtual int reply_data (ACE_RMCast::Data &) = 0;
virtual int reply_poll (ACE_RMCast::Poll &) = 0;
virtual int reply_ack_join (ACE_RMCast::Ack_Join &) = 0;
@@ -79,8 +80,8 @@ public:
//@}
/*!
- Proxies process the ACK sequence numbers to save the sequence
- numbers reported from the remote peer.
+ * Proxies process the ACK sequence numbers to cache the ack
+ * information from the peer.
*/
virtual int ack (ACE_RMCast::Ack &);
@@ -88,7 +89,7 @@ private:
//@{
//! Cache the sequence numbers reported from the remote peer using
//! Ack messages
- ACE_UINT32 highest_in_sequence_;
+ ACE_UINT32 next_expected_;
ACE_UINT32 highest_received_;
//@}
};
diff --git a/ace/RMCast/RMCast_Proxy.i b/ace/RMCast/RMCast_Proxy.i
index f93feaa5639..6fee09fe9e5 100644
--- a/ace/RMCast/RMCast_Proxy.i
+++ b/ace/RMCast/RMCast_Proxy.i
@@ -2,7 +2,7 @@
ACE_INLINE
ACE_RMCast_Proxy::ACE_RMCast_Proxy (void)
- : highest_in_sequence_ (0)
+ : next_expected_ (0)
, highest_received_ (0)
{
}
diff --git a/ace/RMCast/RMCast_Retransmission.cpp b/ace/RMCast/RMCast_Retransmission.cpp
index a996e1204d5..7e38cdf7c97 100644
--- a/ace/RMCast/RMCast_Retransmission.cpp
+++ b/ace/RMCast/RMCast_Retransmission.cpp
@@ -16,18 +16,60 @@ ACE_RMCast_Retransmission::~ACE_RMCast_Retransmission (void)
{
}
+class ACE_RMCast_Resend_Worker
+ : public ACE_RMCast_Worker<ACE_UINT32,ACE_RMCast::Data>
+{
+public:
+ ACE_RMCast_Resend_Worker (ACE_RMCast_Module *next,
+ ACE_UINT32 max_sequence_number)
+ : n (0)
+ , next_ (next)
+ , max_sequence_number_ (max_sequence_number)
+ {
+ }
+
+ int work (ACE_UINT32 const & key,
+ ACE_RMCast::Data const & item)
+ {
+ if (key > this->max_sequence_number_)
+ return 0;
+ ACE_DEBUG ((LM_DEBUG,
+ " Retransmission::resend - message %d resent\n",
+ key));
+ ACE_RMCast::Data data = item;
+ int r = this->next_->data (data);
+ if (r != 0)
+ return r;
+ n++;
+ return 0;
+ }
+
+ int n;
+
+private:
+ ACE_RMCast_Module *next_;
+
+ ACE_UINT32 max_sequence_number_;
+};
+
int
-ACE_RMCast_Retransmission::close (void)
+ACE_RMCast_Retransmission::resend (ACE_UINT32 max_sequence_number)
{
- Messages_Iterator end = this->messages_.end ();
+ if (this->next () == 0)
+ return 0;
- for (Messages_Iterator i = this->messages_.begin ();
- i != end;
- ++i)
- {
- ACE_Message_Block::release ((*i).item ().payload);
- }
- this->messages_.close ();
+ ACE_RMCast_Resend_Worker worker (this->next (), max_sequence_number);
+
+ if (this->messages_.for_each (&worker) == -1)
+ return -1;
+
+ return worker.n;
+}
+
+int
+ACE_RMCast_Retransmission::close (void)
+{
+ // @@
return 0;
}
@@ -40,7 +82,6 @@ ACE_RMCast_Retransmission::data (ACE_RMCast::Data &data)
int r = this->next ()->data (data);
if (r == 0)
{
- ACE_GUARD_RETURN (ACE_SYNCH_MUTEX, ace_mon, this->mutex_, -1);
ACE_RMCast::Data copy = data;
copy.payload = ACE_Message_Block::duplicate (data.payload);
r = this->messages_.bind (data.sequence_number, copy);
@@ -55,8 +96,9 @@ ACE_RMCast_Retransmission::join (ACE_RMCast::Join &join)
return 0;
ACE_RMCast::Ack_Join ack_join;
+#if 0
+ // @@
{
- ACE_GUARD_RETURN (ACE_SYNCH_MUTEX, ace_mon, this->mutex_, -1);
Messages_Iterator end = this->messages_.end ();
Messages_Iterator begin = this->messages_.begin ();
@@ -70,6 +112,7 @@ ACE_RMCast_Retransmission::join (ACE_RMCast::Join &join)
ack_join.next_sequence_number = (*begin).key ();
}
}
+#endif
(void) join.source->reply_ack_join (ack_join);
// @@ We should force a full retransmission of all the messages!
@@ -77,20 +120,54 @@ ACE_RMCast_Retransmission::join (ACE_RMCast::Join &join)
return 0;
}
+class ACE_RMCast_Ack_Worker
+ : public ACE_RMCast_Worker<ACE_UINT32,ACE_RMCast::Data>
+{
+public:
+ ACE_RMCast_Ack_Worker (ACE_RMCast::Ack &ack,
+ ACE_RMCast_Retransmission::Messages::Write_Guard &g,
+ ACE_RMCast_Retransmission::Messages *messages)
+ : ack_ (ack)
+ , ace_mon_ (g)
+ , messages_ (messages)
+ {
+ }
+
+ int work (ACE_UINT32 const & key,
+ ACE_RMCast::Data const &)
+ {
+ if (key >= this->ack_.next_expected)
+ return 0;
+ ACE_DEBUG ((LM_DEBUG,
+ " Retransmission::ack - message %d erased\n",
+ key));
+ return this->messages_->unbind_i (this->ace_mon_, key);
+ }
+
+private:
+ ACE_RMCast_Ack_Worker (const ACE_RMCast_Ack_Worker&);
+ ACE_RMCast_Ack_Worker& operator= (const ACE_RMCast_Ack_Worker&);
+
+private:
+ ACE_RMCast::Ack &ack_;
+
+ ACE_RMCast_Retransmission::Messages::Write_Guard &ace_mon_;
+
+ ACE_RMCast_Retransmission::Messages *messages_;
+};
+
int
ACE_RMCast_Retransmission::ack (ACE_RMCast::Ack &ack)
{
- ACE_GUARD_RETURN (ACE_SYNCH_MUTEX, ace_mon, this->mutex_, -1);
- for (Messages_Iterator i = this->messages_.begin ();
- i != this->messages_.end ();
- /* do nothing */)
- {
- ACE_UINT32 key = (*i).key ();
- if (key > ack.highest_in_sequence)
- break;
- this->messages_.unbind (key);
- }
- return 0;
+ Messages::Write_Guard ace_mon (this->messages_.mutex_,
+ this->messages_.cond_,
+ this->messages_.pending_writes_,
+ this->messages_.writing_,
+ this->messages_.collection_);
+
+ ACE_RMCast_Ack_Worker worker (ack, ace_mon, &this->messages_);
+
+ return this->messages_.for_each (&worker);
}
#if defined (ACE_HAS_EXPLICIT_TEMPLATE_INSTANTIATION)
@@ -101,4 +178,10 @@ template class ACE_RB_Tree_Iterator<ACE_UINT32,ACE_RMCast::Data,ACE_Less_Than<AC
template class ACE_RB_Tree_Reverse_Iterator<ACE_UINT32,ACE_RMCast::Data,ACE_Less_Than<ACE_UINT32>,ACE_Null_Mutex>;
template class ACE_RB_Tree_Node<ACE_UINT32,ACE_RMCast::Data>;
+template class ACE_RMCast_Copy_On_Write<ACE_UINT32,ACE_RMCast::Data,ACE_RMCast_Retransmission::Collection,ACE_RMCast_Retransmission::Collection_Iterator>;
+template class ACE_RMCast_Copy_On_Write_Write_Guard<ACE_RMCast_Retransmission::Collection,ACE_RMCast_Retransmission::Collection_Iterator>;
+template class ACE_RMCast_Copy_On_Write_Read_Guard<ACE_RMCast_Retransmission::Collection,ACE_RMCast_Retransmission::Collection_Iterator>;
+template class ACE_RMCast_Copy_On_Write_Collection<ACE_RMCast_Retransmission::Collection,ACE_RMCast_Retransmission::Collection_Iterator>;
+template class ACE_RMCast_Worker<ACE_UINT32,ACE_RMCast::Data>;
+
#endif /* ACE_HAS_EXPLICIT_TEMPLATE_INSTANTIATION */
diff --git a/ace/RMCast/RMCast_Retransmission.h b/ace/RMCast/RMCast_Retransmission.h
index 7c586fe5dd6..b7bc20d2914 100644
--- a/ace/RMCast/RMCast_Retransmission.h
+++ b/ace/RMCast/RMCast_Retransmission.h
@@ -19,6 +19,7 @@
#include "ace/pre.h"
#include "RMCast_Module.h"
+#include "RMCast_Copy_On_Write.h"
#include "ace/RB_Tree.h"
#include "ace/Synch.h"
@@ -26,38 +27,72 @@
# pragma once
#endif /* ACE_LACKS_PRAGMA_ONCE */
+//! Store messages for retransmission in reliable configurations
+/*!
+ * Reliable configurations of the RMCast framework need to store
+ * messages on the sender side to resend them if one or more clients
+ * do not receive them successfully.
+ */
class ACE_RMCast_Export ACE_RMCast_Retransmission : public ACE_RMCast_Module
{
- // = TITLE
- // Reliable Multicast Retransmission
- //
- // = DESCRIPTION
- // Define the interface for all reliable multicast retransmission
public:
// = Initialization and termination methods.
+ //! Constructor
ACE_RMCast_Retransmission (void);
- // Constructor
+ //! Destructor
virtual ~ACE_RMCast_Retransmission (void);
- // Destructor
- // = The RMCast_Module methods
+ //! Use a Red-Black Tree to keep the queue of messages
+ typedef ACE_RB_Tree<ACE_UINT32,ACE_RMCast::Data,ACE_Less_Than<ACE_UINT32>,ACE_Null_Mutex> Collection;
+ typedef ACE_RB_Tree_Iterator<ACE_UINT32,ACE_RMCast::Data,ACE_Less_Than<ACE_UINT32>,ACE_Null_Mutex> Collection_Iterator;
+
+ //! The messages are stored in the Copy_On_Write wrapper to provide
+ //! an efficient, but thread safe interface.
+ typedef ACE_RMCast_Copy_On_Write<ACE_UINT32,ACE_RMCast::Data,Collection,Collection_Iterator> Messages;
+
+ //! Resend messages
+ /*!
+ * Resends all the messages up to \param max_sequence_number
+ * Returns the number of messages sent, or -1 if there where any
+ * errors.
+ */
+ int resend (ACE_UINT32 max_sequence_number);
+
+ //! Cleanup all the stored messages
virtual int close (void);
+
+ //! Pass the message downstream, but also save it in the
+ //! retransmission queue
+ /*!
+ * Sequence number are assigned by the ACE_RMCast_Fragmentation
+ * class, consequently this class first passes the message
+ * downstream, to obtain the sequence number and then stores the
+ * message for later retransmission.
+ */
virtual int data (ACE_RMCast::Data &data);
+
+ //! Process an Ack message from the remote receivers.
+ /*!
+ * Normally this Ack message will be a summary of all the Ack
+ * messages received by the ACE_RMCast_Membership class
+ */
virtual int ack (ACE_RMCast::Ack &);
+
+ //! Detect when new members join the group and Ack_Join them
+ /*!
+ * When a new receiver joins the group this module sends an Ack_Join
+ * message with the next sequence number that the receiver should
+ * expect.
+ * The sequence number is obtained from the current list of cached
+ * messages.
+ */
virtual int join (ACE_RMCast::Join &);
protected:
- typedef ACE_RB_Tree<ACE_UINT32,ACE_RMCast::Data,ACE_Less_Than<ACE_UINT32>,ACE_Null_Mutex>
- Messages;
- typedef ACE_RB_Tree_Iterator<ACE_UINT32,ACE_RMCast::Data,ACE_Less_Than<ACE_UINT32>,ACE_Null_Mutex>
- Messages_Iterator;
+ //! The retransmission buffer
Messages messages_;
- // The retransmission buffer
-
- ACE_SYNCH_MUTEX mutex_;
- // Synchronization
};
#if defined (__ACE_INLINE__)
diff --git a/ace/RMCast/RMCast_UDP_Event_Handler.h b/ace/RMCast/RMCast_UDP_Event_Handler.h
index 02798cee7f8..2a6e7c45d42 100644
--- a/ace/RMCast/RMCast_UDP_Event_Handler.h
+++ b/ace/RMCast/RMCast_UDP_Event_Handler.h
@@ -1,16 +1,5 @@
// $Id$
-// ============================================================================
-//
-// = DESCRIPTION
-// Implement an adapter between the ACE Reactor and the
-// ACE_RMCast_IO_UDP
-//
-// = AUTHOR
-// Carlos O'Ryan <coryan@uci.edu>
-//
-// ============================================================================
-
#ifndef ACE_RMCAST_UDP_EVENT_HANDLER_H
#define ACE_RMCAST_UDP_EVENT_HANDLER_H
#include "ace/pre.h"
@@ -25,24 +14,41 @@
class ACE_RMCast_IO_UDP;
class ACE_INET_Addr;
+//! Implement an Adapter for the ACE_RMCast_IO_UDP class
+/*!
+ * Applications may wish to use the ACE_Reactor to demultiplex I/O
+ * events for an ACE_RMCast_IO_UDP object. However other application
+ * may choose to make ACE_RMCast_IO_UDP active, or they may dedicate
+ * their own threads for its events.
+ * To avoid couplin ACE_RMCast_IO_UDP with the Reactor we don't make
+ * it derived from ACE_Event_Handler or any other class in the Reactor
+ * framework, instead, this simple Adapter can forward the Reactor
+ * messages to an ACE_RMCast_IO_UDP object.
+ */
class ACE_RMCast_Export ACE_RMCast_UDP_Event_Handler : public ACE_Event_Handler
{
public:
- ACE_RMCast_UDP_Event_Handler (ACE_RMCast_IO_UDP *receiver);
- // Constructor
-
+ //! Constructor, save io_udp as the Adaptee in the Adapter pattern.
+ ACE_RMCast_UDP_Event_Handler (ACE_RMCast_IO_UDP *io_udp);
+
+ //! Destructor
+ /*!
+ * Notice that this class does not own the ACE_RMCast_IO_UDP
+ * adaptee, so it does not destroy it.
+ */
~ACE_RMCast_UDP_Event_Handler (void);
- // Destructor
- // = The Event_Handler methods
+ //@{
+ //! Documented in ACE_Event_Handler class
virtual ACE_HANDLE get_handle (void) const;
virtual int handle_input (ACE_HANDLE fd = ACE_INVALID_HANDLE);
virtual int handle_timeout (const ACE_Time_Value &current_time,
const void *act = 0);
+ //@}
private:
+ //! The adaptee
ACE_RMCast_IO_UDP *io_udp_;
- // The sender
};
#if defined (__ACE_INLINE__)
diff --git a/ace/RMCast/RMCast_UDP_Proxy.cpp b/ace/RMCast/RMCast_UDP_Proxy.cpp
index 010267b0cbb..2eb0983b171 100644
--- a/ace/RMCast/RMCast_UDP_Proxy.cpp
+++ b/ace/RMCast/RMCast_UDP_Proxy.cpp
@@ -123,7 +123,7 @@ ACE_RMCast_UDP_Proxy::receive_message (char *buffer, size_t size)
ACE_OS::memcpy (&tmp, buffer + 1,
sizeof(tmp));
- ack.highest_in_sequence = ACE_NTOHL (tmp);
+ ack.next_expected = ACE_NTOHL (tmp);
ACE_OS::memcpy (&tmp, buffer + 1 + sizeof(ACE_UINT32),
sizeof(tmp));
ack.highest_received = ACE_NTOHL (tmp);
@@ -175,4 +175,3 @@ ACE_RMCast_UDP_Proxy::reply_leave (ACE_RMCast::Leave &leave)
{
return this->io_udp_->send_leave (leave, this->peer_addr_);
}
-
diff --git a/ace/RMCast/RMCast_Worker.cpp b/ace/RMCast/RMCast_Worker.cpp
new file mode 100644
index 00000000000..06254b8c0f6
--- /dev/null
+++ b/ace/RMCast/RMCast_Worker.cpp
@@ -0,0 +1,19 @@
+// $Id$
+
+#ifndef ACE_RMCAST_WORKER_CPP
+#define ACE_RMCAST_WORKER_CPP
+
+#include "RMCast_Worker.h"
+
+#if ! defined (__ACE_INLINE__)
+#include "RMCast_Worker.i"
+#endif /* __ACE_INLINE__ */
+
+ACE_RCSID(RMCast, RMCast_Worker, "$Id$")
+
+template<class KEY, class ITEM>
+ACE_RMCast_Worker<KEY,ITEM>::~ACE_RMCast_Worker (void)
+{
+}
+
+#endif /* ACE_RMCAST_WORKER_CPP */
diff --git a/ace/RMCast/RMCast_Worker.h b/ace/RMCast/RMCast_Worker.h
new file mode 100644
index 00000000000..d3eb3032ebc
--- /dev/null
+++ b/ace/RMCast/RMCast_Worker.h
@@ -0,0 +1,36 @@
+/* -*- C++ -*- */
+// $Id$
+//
+
+#ifndef ACE_RMCAST_WORKER_H
+#define ACE_RMCAST_WORKER_H
+
+#include "ace/config-all.h"
+
+#if !defined (ACE_LACKS_PRAGMA_ONCE)
+# pragma once
+#endif /* ACE_LACKS_PRAGMA_ONCE */
+
+template<class KEY, class ITEM>
+class ACE_RMCast_Worker
+{
+public:
+ virtual ~ACE_RMCast_Worker (void);
+
+ virtual int work (KEY const & key,
+ ITEM const & item) = 0;
+};
+
+#if defined (__ACE_INLINE__)
+#include "RMCast_Worker.i"
+#endif /* __ACE_INLINE__ */
+
+#if defined (ACE_TEMPLATES_REQUIRE_SOURCE)
+#include "RMCast_Worker.cpp"
+#endif /* ACE_TEMPLATES_REQUIRE_SOURCE */
+
+#if defined (ACE_TEMPLATES_REQUIRE_PRAGMA)
+#pragma implementation ("RMCast_Worker.cpp")
+#endif /* ACE_TEMPLATES_REQUIRE_PRAGMA */
+
+#endif /* ACE_RMCAST_WORKER_H */
diff --git a/ace/RMCast/RMCast_Worker.i b/ace/RMCast/RMCast_Worker.i
new file mode 100644
index 00000000000..cfa1da318d3
--- /dev/null
+++ b/ace/RMCast/RMCast_Worker.i
@@ -0,0 +1 @@
+// $Id$