summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--ChangeLog22
-rw-r--r--ChangeLogs/ChangeLog-02a22
-rw-r--r--ChangeLogs/ChangeLog-03a22
-rw-r--r--ace/RMCast/Makefile6
-rw-r--r--ace/RMCast/RMCast.h292
-rw-r--r--ace/RMCast/RMCast_Membership.cpp24
-rw-r--r--ace/RMCast/RMCast_Module.h49
-rw-r--r--ace/RMCast/RMCast_Proxy.cpp8
-rw-r--r--ace/RMCast/RMCast_Proxy.h55
-rw-r--r--ace/RMCast/RMCast_Reassembly.cpp5
-rw-r--r--ace/RMCast/RMCast_Reassembly.h16
-rw-r--r--ace/RMCast/RMCast_Retransmission.cpp3
-rw-r--r--protocols/ace/RMCast/Makefile6
-rw-r--r--protocols/ace/RMCast/RMCast.h292
-rw-r--r--protocols/ace/RMCast/RMCast_Membership.cpp24
-rw-r--r--protocols/ace/RMCast/RMCast_Module.h49
-rw-r--r--protocols/ace/RMCast/RMCast_Proxy.cpp8
-rw-r--r--protocols/ace/RMCast/RMCast_Proxy.h55
-rw-r--r--protocols/ace/RMCast/RMCast_Reassembly.cpp5
-rw-r--r--protocols/ace/RMCast/RMCast_Reassembly.h16
-rw-r--r--protocols/ace/RMCast/RMCast_Retransmission.cpp3
-rw-r--r--tests/RMCast/Makefile196
-rw-r--r--tests/RMCast/RMCast_Membership_Test.cpp449
23 files changed, 1281 insertions, 346 deletions
diff --git a/ChangeLog b/ChangeLog
index a4b4d34f6a8..b77525d55c1 100644
--- a/ChangeLog
+++ b/ChangeLog
@@ -1,3 +1,25 @@
+Wed Sep 27 08:23:58 2000 Carlos O'Ryan <coryan@uci.edu>
+
+ * ace/RMCast/Makefile:
+ Updated dependencies
+
+ * ace/RMCast/RMCast.h:
+ * ace/RMCast/RMCast_Module.h:
+ * ace/RMCast/RMCast_Proxy.cpp:
+ * ace/RMCast/RMCast_Proxy.h:
+ * ace/RMCast/RMCast_Reassembly.h:
+ * ace/RMCast/RMCast_Reassembly.cpp:
+ * ace/RMCast/RMCast_Retransmission.cpp:
+ Update comments to be doxygen friendly
+
+ * ace/RMCast/RMCast_Membership.cpp:
+ Fixed problems in Ack management, we were stopping the useful
+ Acks, not the ones that just represented repeated information.
+
+ * tests/RMCast/Makefile:
+ * tests/RMCast/RMCast_Membership_Test.cpp:
+ Add test for the ACE_RMCast_Membership class.
+
Tue Sep 26 22:39:42 2000 Darrell Brunsch <brunsch@uci.edu>
* examples/Reactor/WFMO_Reactor/test_abandoned.cpp:
diff --git a/ChangeLogs/ChangeLog-02a b/ChangeLogs/ChangeLog-02a
index a4b4d34f6a8..b77525d55c1 100644
--- a/ChangeLogs/ChangeLog-02a
+++ b/ChangeLogs/ChangeLog-02a
@@ -1,3 +1,25 @@
+Wed Sep 27 08:23:58 2000 Carlos O'Ryan <coryan@uci.edu>
+
+ * ace/RMCast/Makefile:
+ Updated dependencies
+
+ * ace/RMCast/RMCast.h:
+ * ace/RMCast/RMCast_Module.h:
+ * ace/RMCast/RMCast_Proxy.cpp:
+ * ace/RMCast/RMCast_Proxy.h:
+ * ace/RMCast/RMCast_Reassembly.h:
+ * ace/RMCast/RMCast_Reassembly.cpp:
+ * ace/RMCast/RMCast_Retransmission.cpp:
+ Update comments to be doxygen friendly
+
+ * ace/RMCast/RMCast_Membership.cpp:
+ Fixed problems in Ack management, we were stopping the useful
+ Acks, not the ones that just represented repeated information.
+
+ * tests/RMCast/Makefile:
+ * tests/RMCast/RMCast_Membership_Test.cpp:
+ Add test for the ACE_RMCast_Membership class.
+
Tue Sep 26 22:39:42 2000 Darrell Brunsch <brunsch@uci.edu>
* examples/Reactor/WFMO_Reactor/test_abandoned.cpp:
diff --git a/ChangeLogs/ChangeLog-03a b/ChangeLogs/ChangeLog-03a
index a4b4d34f6a8..b77525d55c1 100644
--- a/ChangeLogs/ChangeLog-03a
+++ b/ChangeLogs/ChangeLog-03a
@@ -1,3 +1,25 @@
+Wed Sep 27 08:23:58 2000 Carlos O'Ryan <coryan@uci.edu>
+
+ * ace/RMCast/Makefile:
+ Updated dependencies
+
+ * ace/RMCast/RMCast.h:
+ * ace/RMCast/RMCast_Module.h:
+ * ace/RMCast/RMCast_Proxy.cpp:
+ * ace/RMCast/RMCast_Proxy.h:
+ * ace/RMCast/RMCast_Reassembly.h:
+ * ace/RMCast/RMCast_Reassembly.cpp:
+ * ace/RMCast/RMCast_Retransmission.cpp:
+ Update comments to be doxygen friendly
+
+ * ace/RMCast/RMCast_Membership.cpp:
+ Fixed problems in Ack management, we were stopping the useful
+ Acks, not the ones that just represented repeated information.
+
+ * tests/RMCast/Makefile:
+ * tests/RMCast/RMCast_Membership_Test.cpp:
+ Add test for the ACE_RMCast_Membership class.
+
Tue Sep 26 22:39:42 2000 Darrell Brunsch <brunsch@uci.edu>
* examples/Reactor/WFMO_Reactor/test_abandoned.cpp:
diff --git a/ace/RMCast/Makefile b/ace/RMCast/Makefile
index 3241d84bca5..ae13792c4c4 100644
--- a/ace/RMCast/Makefile
+++ b/ace/RMCast/Makefile
@@ -74,7 +74,8 @@ include $(ACE_ROOT)/include/makeinclude/rules.local.GNU
.obj/RMCast_Module.o .obj/RMCast_Module.so .shobj/RMCast_Module.o .shobj/RMCast_Module.so: RMCast_Module.cpp RMCast_Module.h \
$(ACE_ROOT)/ace/pre.h \
- RMCast.h $(ACE_ROOT)/ace/OS.h \
+ RMCast.h \
+ $(ACE_ROOT)/ace/OS.h \
$(ACE_ROOT)/ace/post.h \
$(ACE_ROOT)/ace/ACE_export.h \
$(ACE_ROOT)/ace/svc_export.h \
@@ -99,7 +100,8 @@ include $(ACE_ROOT)/include/makeinclude/rules.local.GNU
.obj/RMCast_Module_Factory.o .obj/RMCast_Module_Factory.so .shobj/RMCast_Module_Factory.o .shobj/RMCast_Module_Factory.so: RMCast_Module_Factory.cpp \
RMCast_Module_Factory.h \
$(ACE_ROOT)/ace/pre.h \
- RMCast.h $(ACE_ROOT)/ace/OS.h \
+ RMCast.h \
+ $(ACE_ROOT)/ace/OS.h \
$(ACE_ROOT)/ace/post.h \
$(ACE_ROOT)/ace/ACE_export.h \
$(ACE_ROOT)/ace/svc_export.h \
diff --git a/ace/RMCast/RMCast.h b/ace/RMCast/RMCast.h
index c7cdf718c04..abf2a24e946 100644
--- a/ace/RMCast/RMCast.h
+++ b/ace/RMCast/RMCast.h
@@ -28,69 +28,22 @@
class ACE_Message_Block;
class ACE_RMCast_Proxy;
+//! The RMCast namespace
+/*!
+ Several simple data structures and enums are shared by all the
+ RMCast components, this is the place where we put them by default.
+*/
class ACE_RMCast_Export ACE_RMCast
{
public:
- // Message formats
-
- // From SENDER to RECEIVER
- //
- // POLL
- // +---------+----------------------+
- // | 8 bits | MT_POLL |
- // +---------+----------------------+
- //
- // ACK_JOIN
- // +---------+----------------------+
- // | 8 bits | MT_ACK_JOIN |
- // +---------+----------------------+
- // | 32 bits | next_sequence_number |
- // +---------+----------------------+
- //
- // ACK_LEAVE
- // +---------+----------------------+
- // | 8 bits | ACK_LEAVE |
- // +---------+----------------------+
- //
- // DATA
- // +---------+----------------------+
- // | 8 bits | DATA |
- // +---------+----------------------+
- // | 32 bits | sequence_number |
- // +---------+----------------------+
- // | 32 bits | message_size |
- // +---------+----------------------+
- // | 32 bits | fragment_offset |
- // +---------+----------------------+
- // ? ? ? ? ? | 32 bits | payload_size |
- // ? ? ? ? ? +---------+----------------------+
- // | | payload |
- // +---------+----------------------+
- //
-
- // From RECEIVER to SENDER
- //
- // MT_JOIN
- // +---------+----------------------+
- // | 8 bits | MT_JOIN |
- // +---------+----------------------+
- //
- // MT_LEAVE
- // +---------+----------------------+
- // | 8 bits | MT_LEAVE |
- // +---------+----------------------+
- //
- // MT_ACK
- // +---------+----------------------+
- // | 8 bits | MT_ACK |
- // +---------+----------------------+
- // | 32 bits | highest_in_sequence |
- // +---------+----------------------+
- // | 32 bits | highest_received |
- // +---------+----------------------+
- //
-
+ //! The message types
+ /*!
+ Each message includes a type field in the header used by the
+ receiver to correctly parse it.
+ Classes with the same name as the message type describe the actual
+ format of the message.
+ */
enum Message_Type
{
// Sender initiated
@@ -105,6 +58,41 @@ public:
MT_LAST
};
+ //! Simple enum used to describe the receiver state transitions
+ /*!
+ Receivers go through several states before they can fully accept
+ messages, the following comments describe those states, as well as
+ the possible transitions
+ This configuration is pesimistic, any invalid message is cause
+ enough to reclaim all the resources. This partially addresses
+ situations where either accidentally or intentionally a sender is
+ multicasting packets to the wrong group.
+
+ <CODE>
+ NON_EXISTENT JOINING JOINED LEAVING<BR>
+ ----------------------------------------------------------------<BR>
+ POLL JOINING JOINING JOINED LEAVING<BR>
+ Send/Join Send/Join Send/Ack Send/Leave<BR>
+ <BR>
+ ACK NON_EXISTENT NON_EXISTENT NON_EXISTENT NON_EXISTENT<BR>
+ Noop Destroy Destroy Destroy<BR>
+ <BR>
+ JOIN NON_EXISTENT NON_EXISTENT NON_EXISTENT NON_EXISTENT<BR>
+ Noop Destroy Destroy Destroy<BR>
+ <BR>
+ LEAVE NON_EXISTENT NON_EXISTENT NON_EXISTENT NON_EXISTENT<BR>
+ Noop Destroy Destroy Destroy<BR>
+ <BR>
+ ACK_JOIN JOINING JOINED JOINED LEAVING<BR>
+ Send/Join Update ACT Update ACT Send/Leave<BR>
+ <BR>
+ ACK_LEAVE NON_EXISTENT NON_EXISTENT NON_EXISTENT NON_EXISTENT<BR>
+ Noop Destroy Destroy Destroy<BR>
+ <BR>
+ SEND_DATA JOINING JOINING JOINED LEAVING<BR>
+ Send/Join Send/Join Recv/Data Send/Leave<BR>
+ </CODE>
+ */
enum Receiver_State
{
RS_NON_EXISTENT,
@@ -113,81 +101,75 @@ public:
RS_LEAVING
};
- // State transition (and actions) for the receivers.
- // This configuration is pesimistic, any invalid message is cause
- // enough to reclaim all the resources. This partially addresses
- // situations where either accidentally or intentionally a sender is
- // multicasting packets to the wrong group.
- //
- // NON_EXISTENT JOINING JOINED LEAVING
- // ----------------------------------------------------------------
- // POLL JOINING JOINING JOINED LEAVING
- // Send/Join Send/Join Send/Ack Send/Leave
- //
- // ACK NON_EXISTENT NON_EXISTENT NON_EXISTENT NON_EXISTENT
- // Noop Destroy Destroy Destroy
- //
- // JOIN NON_EXISTENT NON_EXISTENT NON_EXISTENT NON_EXISTENT
- // Noop Destroy Destroy Destroy
- //
- // LEAVE NON_EXISTENT NON_EXISTENT NON_EXISTENT NON_EXISTENT
- // Noop Destroy Destroy Destroy
- //
- // ACK_JOIN JOINING JOINED JOINED LEAVING
- // Send/Join Update ACT Update ACT Send/Leave
- //
- // ACK_LEAVE NON_EXISTENT NON_EXISTENT NON_EXISTENT NON_EXISTENT
- // Noop Destroy Destroy Destroy
- //
- // SEND_DATA JOINING JOINING JOINED LEAVING
- // Send/Join Send/Join Recv/Data Send/Leave
- //
+ //! Simle enum used to describe the state transitions for senders
+ /*!
+ State transition (and actions) for the senders.
+ This configuration is pesimistic, any invalid message is cause
+ enough to reclaim all the resources. This partially addresses
+ situations where either accidentally or intentionally a sender is
+ multicasting packets to the wrong group.
+
+ <CODE>
+ NON_EXISTENT JOINED<BR>
+ ------------------------------------------<BR>
+ POLL NON_EXISTENT NON_EXISTENT<BR>
+ Destroy Destroy<BR>
+ <BR>
+ ACK NON_EXISTENT JOINED<BR>
+ Noop Process/Ack<BR>
+ <BR>
+ JOIN JOINED NON_EXISTENT<BR>
+ Send/Join_Ack Send/Join_Ack<BR>
+ <BR>
+ LEAVE NON_EXISTENT NON_EXISTENT<BR>
+ Send/Leave_Ack Send/Leave_Ack<BR>
+ Destroy<BR>
+ <BR>
+ ACK_JOIN NON_EXISTENT NON_EXISTENT<BR>
+ Noop Destroy<BR>
+ <BR>
+ ACK_LEAVE NON_EXISTENT NON_EXISTENT<BR>
+ Noop Destroy<BR>
+ <BR>
+ SEND_DATA NON_EXISTENT NON_EXISTENT<BR>
+ Noop Destroy<BR>
+ </CODE>
+ */
enum Sender_State
{
SS_NON_EXISTENT,
SS_JOINED
};
- // State transition (and actions) for the senders.
- // This configuration is pesimistic, any invalid message is cause
- // enough to reclaim all the resources. This partially addresses
- // situations where either accidentally or intentionally a sender is
- // multicasting packets to the wrong group.
- //
- // NON_EXISTENT JOINED
- // ------------------------------------------
- // POLL NON_EXISTENT NON_EXISTENT
- // Destroy Destroy
- //
- // ACK NON_EXISTENT JOINED
- // Noop Process/Ack
- //
- // JOIN JOINED NON_EXISTENT
- // Send/Join_Ack Send/Join_Ack
- //
- // LEAVE NON_EXISTENT NON_EXISTENT
- // Send/Leave_Ack Send/Leave_Ack
- // Destroy
- //
- // ACK_JOIN NON_EXISTENT NON_EXISTENT
- // Noop Destroy
- //
- // ACK_LEAVE NON_EXISTENT NON_EXISTENT
- // Noop Destroy
- //
- // SEND_DATA NON_EXISTENT NON_EXISTENT
- // Noop Destroy
- //
-
// These structures define the basic layout of the messages.
+
+ //! This is the main message sent by senders
+ /*!
+ <CODE>
+ +---------+----------------------+<BR>
+ | 8 bits | DATA |<BR>
+ +---------+----------------------+<BR>
+ | 32 bits | sequence_number |<BR>
+ +---------+----------------------+<BR>
+ | 32 bits | message_size |<BR>
+ +---------+----------------------+<BR>
+ | 32 bits | fragment_offset |<BR>
+ +---------+----------------------+<BR>
+ ? ? ? ? ? | 32 bits | payload_size |<BR>
+ ? ? ? ? ? +---------+----------------------+<BR>
+ | | payload |<BR>
+ +---------+----------------------+<BR>
+ </CODE>
+ */
struct Data
{
// Source ID is implicit in recvfrom()...
ACE_UINT32 sequence_number;
ACE_UINT32 total_size;
ACE_UINT32 fragment_offset;
+
// @@ TODO: we may want to add optional fields, such as:
// - Polling clients for their status
// - Sending the range of messages in the queue
@@ -196,41 +178,113 @@ public:
ACE_Message_Block *payload;
+ //! Pass the proxy source between layers
ACE_RMCast_Proxy *source;
};
+ /*!
+ <CODE>
+ +---------+----------------------+<BR>
+ | 8 bits | MT_POLL |<BR>
+ +---------+----------------------+<BR>
+ </CODE>
+ */
struct Poll
{
+ //! Pass the proxy source between layers
ACE_RMCast_Proxy *source;
};
+ //! Receivers accept new members using this message
+ /*!
+ <CODE>
+ +---------+----------------------+<BR>
+ | 8 bits | MT_ACK_JOIN |<BR>
+ +---------+----------------------+<BR>
+ | 32 bits | next_sequence_number |<BR>
+ +---------+----------------------+<BR>
+ </CODE>
+ */
struct Ack_Join
{
ACE_INT32 next_sequence_number;
+ //! Pass the proxy source between layers
ACE_RMCast_Proxy *source;
};
+ //! Senders acknowledge when receivers try to leave
+ /*!
+ <CODE>
+ +---------+----------------------+<BR>
+ | 8 bits | ACK_LEAVE |<BR>
+ +---------+----------------------+<BR>
+ </CODE>
+ */
struct Ack_Leave
{
+ //! Pass the proxy source between layers
ACE_RMCast_Proxy *source;
};
+ //! Provide feedback to the sender about messages received and sent
+ //! so far.
+ /*!
+ *
+ * This message is used to provide feedback information to senders.
+ * It contains two sequence numbers:
+ * - highest_in_sequence: is the sequence number of the last message
+ * received without any lost messages before it
+ * - highest_received: is the sequence number of the last_message
+ * successfully received, there may be some messages lost before it
+ *
+ * <CODE>
+ * +---------+----------------------+<BR>
+ * | 8 bits | MT_ACK |<BR>
+ * +---------+----------------------+<BR>
+ * | 32 bits | highest_in_sequence |<BR>
+ * +---------+----------------------+<BR>
+ * | 32 bits | highest_received |<BR>
+ * +---------+----------------------+<BR>
+ * </CODE>
+ */
struct Ack
{
+ //! The last message received without any losses before it.
ACE_UINT32 highest_in_sequence;
+
+ //! The last message successfully received
ACE_UINT32 highest_received;
+ //! Pass the proxy source between layers
ACE_RMCast_Proxy *source;
};
+ //! Receivers send this message to indicate they want to join
+ /*
+ <CODE>
+ +---------+----------------------+<BR>
+ | 8 bits | MT_JOIN |<BR>
+ +---------+----------------------+<BR>
+ </CODE>
+ */
struct Join
{
+ //! Pass the proxy source between layers
ACE_RMCast_Proxy *source;
};
+ //! Receivers send this message to disconnect gracefully
+ /*!
+ <CODE>
+ +---------+----------------------+<BR>
+ | 8 bits | MT_LEAVE |<BR>
+ +---------+----------------------+<BR>
+ </CODE>
+ */
struct Leave
{
+ //! Pass the proxy source between layers
ACE_RMCast_Proxy *source;
};
};
diff --git a/ace/RMCast/RMCast_Membership.cpp b/ace/RMCast/RMCast_Membership.cpp
index 8ab58d84475..6ee2690a41f 100644
--- a/ace/RMCast/RMCast_Membership.cpp
+++ b/ace/RMCast/RMCast_Membership.cpp
@@ -18,11 +18,13 @@ ACE_RMCast_Membership::~ACE_RMCast_Membership (void)
int
ACE_RMCast_Membership::ack (ACE_RMCast::Ack &ack)
{
+ // ACE_DEBUG ((LM_DEBUG, "ACE_RMCast_Membership::ack\n"));
Proxy_Iterator end = this->proxies_.end ();
Proxy_Iterator i = this->proxies_.begin ();
if (i == end)
return 0;
+ // ACE_DEBUG ((LM_DEBUG, "ACE_RMCast_Membership::ack[2]\n"));
ACE_RMCast::Ack next_ack;
{
ACE_GUARD_RETURN (ACE_SYNCH_MUTEX, ace_mon, this->mutex_, -1);
@@ -30,11 +32,13 @@ ACE_RMCast_Membership::ack (ACE_RMCast::Ack &ack)
{
// @@ This violates an invariant of the class, shouldn't
// happen...
+ // ACE_DEBUG ((LM_DEBUG, "ACE_RMCast_Membership::ack[3]\n"));
return -1;
}
else if (ack.highest_in_sequence == this->highest_in_sequence_)
{
// Nothing new, just continue....
+ // ACE_DEBUG ((LM_DEBUG, "ACE_RMCast_Membership::ack[4]\n"));
return 0;
}
// Possible update, re-evaluate the story...
@@ -52,12 +56,15 @@ ACE_RMCast_Membership::ack (ACE_RMCast::Ack &ack)
if (r > highest_received)
highest_received = r;
}
+#if 0
if (this->highest_in_sequence_ >= highest_in_sequence
- || this->highest_received_ < highest_received)
+ || this->highest_received_ >= highest_received)
{
// No change....
+ // ACE_DEBUG ((LM_DEBUG, "ACE_RMCast_Membership::ack[5]\n"));
return 0;
}
+#endif /* 0 */
this->highest_in_sequence_ = highest_in_sequence;
this->highest_received_ = highest_received;
if (this->next () == 0)
@@ -84,10 +91,7 @@ ACE_RMCast_Membership::join (ACE_RMCast::Join &join)
return -1;
}
- if (this->next () == 0)
- return 0;
-
- return this->next ()->join (join);
+ return this->ACE_RMCast_Module::join (join);
}
int
@@ -98,18 +102,16 @@ ACE_RMCast_Membership::leave (ACE_RMCast::Leave &leave)
{
ACE_GUARD_RETURN (ACE_SYNCH_MUTEX, ace_mon, this->mutex_, -1);
- if (this->proxies_.remove (leave.source) == -1)
- return 0;
+ (void) this->proxies_.remove (leave.source);
}
- if (this->next () == 0)
- return 0;
-
- return this->next ()->leave (leave);
+ return this->ACE_RMCast_Module::leave (leave);
}
#if defined (ACE_HAS_EXPLICIT_TEMPLATE_INSTANTIATION)
template class ACE_Unbounded_Set<ACE_RMCast_Proxy*>;
+template class ACE_Unbounded_Set_Iterator<ACE_RMCast_Proxy*>;
+template class ACE_Node<ACE_RMCast_Proxy*>;
#endif /* ACE_HAS_EXPLICIT_TEMPLATE_INSTANTIATION */
diff --git a/ace/RMCast/RMCast_Module.h b/ace/RMCast/RMCast_Module.h
index 9f83c2d5be4..dc4077fa4ab 100644
--- a/ace/RMCast/RMCast_Module.h
+++ b/ace/RMCast/RMCast_Module.h
@@ -27,51 +27,66 @@
class ACE_Message_Block;
class ACE_Time_Value;
+//! Reliable Multicast Module
+/*!
+ The reliable multicast protocol is implemented as a stack of
+ "Modules" each one performing one specific task. In short, this is
+ an instance of the pipes-and-filters architectural pattern.
+*/
class ACE_RMCast_Export ACE_RMCast_Module
{
- // = TITLE
- // Reliable Multicast Module
- //
- // = DESCRIPTION
- // The reliable multicast protocol is implemented as a stack of
- // "Modules" each one performing one specific task.
- // In short, this is an instance of the pipes-and-filters
- // architectural pattern.
- //
public:
// = Initialization and termination methods.
ACE_RMCast_Module (void);
- // Constructor
+ //!< Constructor
virtual ~ACE_RMCast_Module (void);
- // Destructor
+ //!< Destructor
virtual int next (ACE_RMCast_Module *next);
+ //!< Modifier for the next element in the stack
+
virtual ACE_RMCast_Module* next (void) const;
+ //!< Accesor for the next element in the stack
+
virtual int prev (ACE_RMCast_Module *prev);
+ //!< Modifier for the previous element in the stack
+
virtual ACE_RMCast_Module* prev (void) const;
- // Modifiers and accessors for the previous and next module in the
- // stack
+ //!< Accesor for the previous element in the stack
virtual int open (void);
- // Initialize the module, setting up the next module
+ //!< Initialize the module, setting up the next module
virtual int close (void);
- // Close the module.
+ //!< Close the module.
virtual int data (ACE_RMCast::Data &);
+ //!< Push data through the stack
+
virtual int poll (ACE_RMCast::Poll &);
+ //!< Push a polling request through the stack
+
virtual int ack_join (ACE_RMCast::Ack_Join &);
+ //!< Push a message to ack a join request through the stack
+
virtual int ack_leave (ACE_RMCast::Ack_Leave &);
+ //!< Push a message to ack a leave request through the stack
+
virtual int ack (ACE_RMCast::Ack &);
+ //!< Push an ack mesage through the stack
+
virtual int join (ACE_RMCast::Join &);
+ //!< Push a join message through the stack
+
virtual int leave (ACE_RMCast::Leave &);
- // Push data down the stack
+ //!< Push a leave message through the stack
private:
+ //! The next element in the stack
ACE_RMCast_Module *next_;
+ //! The previous element in the stack
ACE_RMCast_Module *prev_;
- // The next and previous modules in the stack
};
#if defined (__ACE_INLINE__)
diff --git a/ace/RMCast/RMCast_Proxy.cpp b/ace/RMCast/RMCast_Proxy.cpp
index 2a8b2ba40b4..53d9d0b6726 100644
--- a/ace/RMCast/RMCast_Proxy.cpp
+++ b/ace/RMCast/RMCast_Proxy.cpp
@@ -25,3 +25,11 @@ ACE_RMCast_Proxy::highest_received (void) const
{
return this->highest_received_;
}
+
+int
+ACE_RMCast_Proxy::ack (ACE_RMCast::Ack &ack)
+{
+ this->highest_in_sequence_ = ack.highest_in_sequence;
+ this->highest_received_ = ack.highest_received;
+ return this->ACE_RMCast_Module::ack (ack);
+}
diff --git a/ace/RMCast/RMCast_Proxy.h b/ace/RMCast/RMCast_Proxy.h
index dca6494e374..414b74174fb 100644
--- a/ace/RMCast/RMCast_Proxy.h
+++ b/ace/RMCast/RMCast_Proxy.h
@@ -27,28 +27,48 @@
class ACE_Message_Block;
class ACE_Time_Value;
+//! Local representation for remote peers
+/*!
+ Both senders and receivers in the multicast group need to maintain
+ explicit representations of their "peers". For example, a sender
+ needs to know the list of all the receivers and what messages they
+ have reported as successfully received.
+ Likewise, the receiver needs to maintain separate state for each
+ remote sender, and must be able to disconnect from all of them
+ gracefully when needed.
+ The RMCast_Proxy class is an opaque representation of such a peer,
+ and hides all the networking details from the rest of the system.
+*/
class ACE_RMCast_Export ACE_RMCast_Proxy : public ACE_RMCast_Module
{
- // = TITLE
- // Reliable Multicast Proxy
- //
- // = DESCRIPTION
- // The proxy is used to send back messages to either a single
- // receiver (o supplier).
- //
public:
- // = Initialization and termination methods.
+ //! Constructor
ACE_RMCast_Proxy (void);
// Constructor
+ //! Destructor
virtual ~ACE_RMCast_Proxy (void);
- // Destructor
+ //! Return the highest sequence number received without any losses
+ //! before it. Only applies to remote receiver proxies.
+ /*!
+ Please read the documentation in ACE_RMCast::Ack
+ */
virtual ACE_UINT32 highest_in_sequence (void) const;
+
+ //! Return the highest sequence number successfully received.
+ //! Only applies to remote receiver proxies.
+ /*!
+ Please read the documentation in ACE_RMCast::Ack
+ */
virtual ACE_UINT32 highest_received (void) const;
- // Get the sequence numbers received by the remote proxy.
- // Return 0 for sender proxies
+ //@{
+ //! Send messages directly to the peer.
+ /*! Send a message directly to the peer, i.e. the message is not
+ sent through the multicast group and it may not be processed by
+ all the layers in the stack.
+ */
virtual int reply_data (ACE_RMCast::Data &) = 0;
virtual int reply_poll (ACE_RMCast::Poll &) = 0;
virtual int reply_ack_join (ACE_RMCast::Ack_Join &) = 0;
@@ -56,16 +76,21 @@ public:
virtual int reply_ack (ACE_RMCast::Ack &) = 0;
virtual int reply_join (ACE_RMCast::Join &) = 0;
virtual int reply_leave (ACE_RMCast::Leave &) = 0;
- // Push data back to the remote proxy
+ //@}
- // = The RMCast_Module methods
+ /*!
+ Proxies process the ACK sequence numbers to save the sequence
+ numbers reported from the remote peer.
+ */
virtual int ack (ACE_RMCast::Ack &);
private:
+ //@{
+ //! Cache the sequence numbers reported from the remote peer using
+ //! Ack messages
ACE_UINT32 highest_in_sequence_;
ACE_UINT32 highest_received_;
- // Cache the sequence numbers reported from the remote peer using
- // Ack messages
+ //@}
};
#if defined (__ACE_INLINE__)
diff --git a/ace/RMCast/RMCast_Reassembly.cpp b/ace/RMCast/RMCast_Reassembly.cpp
index ba2e9b79c1a..ed488341bae 100644
--- a/ace/RMCast/RMCast_Reassembly.cpp
+++ b/ace/RMCast/RMCast_Reassembly.cpp
@@ -22,6 +22,10 @@ ACE_RMCast_Reassembly (void)
ACE_RMCast_Reassembly::~ACE_RMCast_Reassembly (void)
{
+ /*!<
+ We cleanup the resources in the destructor
+ <B color=red>@@ TODO</B> Why not in the close() operation?
+ */
for (Message_Map_Iterator i = this->messages_.begin ();
i != this->messages_.end ();
++i)
@@ -87,6 +91,7 @@ ACE_RMCast_Reassembly::data (ACE_RMCast::Data &data)
// Push the message...
ACE_RMCast::Data downstream_data;
+ downstream_data.source = data.source;
downstream_data.sequence_number = data.sequence_number;
downstream_data.total_size = message->message_body ()->length ();
downstream_data.fragment_offset = 0;
diff --git a/ace/RMCast/RMCast_Reassembly.h b/ace/RMCast/RMCast_Reassembly.h
index 0bf0c3a61ee..6dc37e1ae19 100644
--- a/ace/RMCast/RMCast_Reassembly.h
+++ b/ace/RMCast/RMCast_Reassembly.h
@@ -24,19 +24,28 @@
class ACE_RMCast_Partial_Message;
+//! Reassemble multiple data fragments into a single data message
+/*!
+ Data messages may not fit in a single MTU in the transport layer, in
+ that case the application configure a RMCast_Fragment module on the
+ sender side. On the receiver side this layer reassemble the
+ messages sent from a <EM>single</EM> source, and passes the messages
+ up the stream.
+*/
class ACE_RMCast_Export ACE_RMCast_Reassembly : public ACE_RMCast_Module
{
public:
+ //! Constructor
ACE_RMCast_Reassembly (void);
- // Constructor
+ //! Destructor
virtual ~ACE_RMCast_Reassembly (void);
- // Destructor
// = The ACE_RMCast_Module methods
virtual int data (ACE_RMCast::Data &data);
private:
+ //! A mutex used to synchronize all the internal operations.
ACE_SYNCH_MUTEX mutex_;
typedef
ACE_Hash_Map_Manager<ACE_UINT32,ACE_RMCast_Partial_Message*,ACE_Null_Mutex>
@@ -45,8 +54,9 @@ private:
ACE_Hash_Map_Iterator<ACE_UINT32,ACE_RMCast_Partial_Message*,ACE_Null_Mutex>
Message_Map_Iterator;
+ //! A map, indexed by sequence number, of the partially received
+ //! messages.
Message_Map messages_;
- // The array of partially received messages
};
#if defined (__ACE_INLINE__)
diff --git a/ace/RMCast/RMCast_Retransmission.cpp b/ace/RMCast/RMCast_Retransmission.cpp
index b90cf0ddf31..a996e1204d5 100644
--- a/ace/RMCast/RMCast_Retransmission.cpp
+++ b/ace/RMCast/RMCast_Retransmission.cpp
@@ -96,6 +96,9 @@ ACE_RMCast_Retransmission::ack (ACE_RMCast::Ack &ack)
#if defined (ACE_HAS_EXPLICIT_TEMPLATE_INSTANTIATION)
template class ACE_RB_Tree<ACE_UINT32,ACE_RMCast::Data,ACE_Less_Than<ACE_UINT32>,ACE_Null_Mutex>;
+template class ACE_RB_Tree_Iterator_Base<ACE_UINT32,ACE_RMCast::Data,ACE_Less_Than<ACE_UINT32>,ACE_Null_Mutex>;
template class ACE_RB_Tree_Iterator<ACE_UINT32,ACE_RMCast::Data,ACE_Less_Than<ACE_UINT32>,ACE_Null_Mutex>;
+template class ACE_RB_Tree_Reverse_Iterator<ACE_UINT32,ACE_RMCast::Data,ACE_Less_Than<ACE_UINT32>,ACE_Null_Mutex>;
+template class ACE_RB_Tree_Node<ACE_UINT32,ACE_RMCast::Data>;
#endif /* ACE_HAS_EXPLICIT_TEMPLATE_INSTANTIATION */
diff --git a/protocols/ace/RMCast/Makefile b/protocols/ace/RMCast/Makefile
index 3241d84bca5..ae13792c4c4 100644
--- a/protocols/ace/RMCast/Makefile
+++ b/protocols/ace/RMCast/Makefile
@@ -74,7 +74,8 @@ include $(ACE_ROOT)/include/makeinclude/rules.local.GNU
.obj/RMCast_Module.o .obj/RMCast_Module.so .shobj/RMCast_Module.o .shobj/RMCast_Module.so: RMCast_Module.cpp RMCast_Module.h \
$(ACE_ROOT)/ace/pre.h \
- RMCast.h $(ACE_ROOT)/ace/OS.h \
+ RMCast.h \
+ $(ACE_ROOT)/ace/OS.h \
$(ACE_ROOT)/ace/post.h \
$(ACE_ROOT)/ace/ACE_export.h \
$(ACE_ROOT)/ace/svc_export.h \
@@ -99,7 +100,8 @@ include $(ACE_ROOT)/include/makeinclude/rules.local.GNU
.obj/RMCast_Module_Factory.o .obj/RMCast_Module_Factory.so .shobj/RMCast_Module_Factory.o .shobj/RMCast_Module_Factory.so: RMCast_Module_Factory.cpp \
RMCast_Module_Factory.h \
$(ACE_ROOT)/ace/pre.h \
- RMCast.h $(ACE_ROOT)/ace/OS.h \
+ RMCast.h \
+ $(ACE_ROOT)/ace/OS.h \
$(ACE_ROOT)/ace/post.h \
$(ACE_ROOT)/ace/ACE_export.h \
$(ACE_ROOT)/ace/svc_export.h \
diff --git a/protocols/ace/RMCast/RMCast.h b/protocols/ace/RMCast/RMCast.h
index c7cdf718c04..abf2a24e946 100644
--- a/protocols/ace/RMCast/RMCast.h
+++ b/protocols/ace/RMCast/RMCast.h
@@ -28,69 +28,22 @@
class ACE_Message_Block;
class ACE_RMCast_Proxy;
+//! The RMCast namespace
+/*!
+ Several simple data structures and enums are shared by all the
+ RMCast components, this is the place where we put them by default.
+*/
class ACE_RMCast_Export ACE_RMCast
{
public:
- // Message formats
-
- // From SENDER to RECEIVER
- //
- // POLL
- // +---------+----------------------+
- // | 8 bits | MT_POLL |
- // +---------+----------------------+
- //
- // ACK_JOIN
- // +---------+----------------------+
- // | 8 bits | MT_ACK_JOIN |
- // +---------+----------------------+
- // | 32 bits | next_sequence_number |
- // +---------+----------------------+
- //
- // ACK_LEAVE
- // +---------+----------------------+
- // | 8 bits | ACK_LEAVE |
- // +---------+----------------------+
- //
- // DATA
- // +---------+----------------------+
- // | 8 bits | DATA |
- // +---------+----------------------+
- // | 32 bits | sequence_number |
- // +---------+----------------------+
- // | 32 bits | message_size |
- // +---------+----------------------+
- // | 32 bits | fragment_offset |
- // +---------+----------------------+
- // ? ? ? ? ? | 32 bits | payload_size |
- // ? ? ? ? ? +---------+----------------------+
- // | | payload |
- // +---------+----------------------+
- //
-
- // From RECEIVER to SENDER
- //
- // MT_JOIN
- // +---------+----------------------+
- // | 8 bits | MT_JOIN |
- // +---------+----------------------+
- //
- // MT_LEAVE
- // +---------+----------------------+
- // | 8 bits | MT_LEAVE |
- // +---------+----------------------+
- //
- // MT_ACK
- // +---------+----------------------+
- // | 8 bits | MT_ACK |
- // +---------+----------------------+
- // | 32 bits | highest_in_sequence |
- // +---------+----------------------+
- // | 32 bits | highest_received |
- // +---------+----------------------+
- //
-
+ //! The message types
+ /*!
+ Each message includes a type field in the header used by the
+ receiver to correctly parse it.
+ Classes with the same name as the message type describe the actual
+ format of the message.
+ */
enum Message_Type
{
// Sender initiated
@@ -105,6 +58,41 @@ public:
MT_LAST
};
+ //! Simple enum used to describe the receiver state transitions
+ /*!
+ Receivers go through several states before they can fully accept
+ messages, the following comments describe those states, as well as
+ the possible transitions
+ This configuration is pesimistic, any invalid message is cause
+ enough to reclaim all the resources. This partially addresses
+ situations where either accidentally or intentionally a sender is
+ multicasting packets to the wrong group.
+
+ <CODE>
+ NON_EXISTENT JOINING JOINED LEAVING<BR>
+ ----------------------------------------------------------------<BR>
+ POLL JOINING JOINING JOINED LEAVING<BR>
+ Send/Join Send/Join Send/Ack Send/Leave<BR>
+ <BR>
+ ACK NON_EXISTENT NON_EXISTENT NON_EXISTENT NON_EXISTENT<BR>
+ Noop Destroy Destroy Destroy<BR>
+ <BR>
+ JOIN NON_EXISTENT NON_EXISTENT NON_EXISTENT NON_EXISTENT<BR>
+ Noop Destroy Destroy Destroy<BR>
+ <BR>
+ LEAVE NON_EXISTENT NON_EXISTENT NON_EXISTENT NON_EXISTENT<BR>
+ Noop Destroy Destroy Destroy<BR>
+ <BR>
+ ACK_JOIN JOINING JOINED JOINED LEAVING<BR>
+ Send/Join Update ACT Update ACT Send/Leave<BR>
+ <BR>
+ ACK_LEAVE NON_EXISTENT NON_EXISTENT NON_EXISTENT NON_EXISTENT<BR>
+ Noop Destroy Destroy Destroy<BR>
+ <BR>
+ SEND_DATA JOINING JOINING JOINED LEAVING<BR>
+ Send/Join Send/Join Recv/Data Send/Leave<BR>
+ </CODE>
+ */
enum Receiver_State
{
RS_NON_EXISTENT,
@@ -113,81 +101,75 @@ public:
RS_LEAVING
};
- // State transition (and actions) for the receivers.
- // This configuration is pesimistic, any invalid message is cause
- // enough to reclaim all the resources. This partially addresses
- // situations where either accidentally or intentionally a sender is
- // multicasting packets to the wrong group.
- //
- // NON_EXISTENT JOINING JOINED LEAVING
- // ----------------------------------------------------------------
- // POLL JOINING JOINING JOINED LEAVING
- // Send/Join Send/Join Send/Ack Send/Leave
- //
- // ACK NON_EXISTENT NON_EXISTENT NON_EXISTENT NON_EXISTENT
- // Noop Destroy Destroy Destroy
- //
- // JOIN NON_EXISTENT NON_EXISTENT NON_EXISTENT NON_EXISTENT
- // Noop Destroy Destroy Destroy
- //
- // LEAVE NON_EXISTENT NON_EXISTENT NON_EXISTENT NON_EXISTENT
- // Noop Destroy Destroy Destroy
- //
- // ACK_JOIN JOINING JOINED JOINED LEAVING
- // Send/Join Update ACT Update ACT Send/Leave
- //
- // ACK_LEAVE NON_EXISTENT NON_EXISTENT NON_EXISTENT NON_EXISTENT
- // Noop Destroy Destroy Destroy
- //
- // SEND_DATA JOINING JOINING JOINED LEAVING
- // Send/Join Send/Join Recv/Data Send/Leave
- //
+ //! Simle enum used to describe the state transitions for senders
+ /*!
+ State transition (and actions) for the senders.
+ This configuration is pesimistic, any invalid message is cause
+ enough to reclaim all the resources. This partially addresses
+ situations where either accidentally or intentionally a sender is
+ multicasting packets to the wrong group.
+
+ <CODE>
+ NON_EXISTENT JOINED<BR>
+ ------------------------------------------<BR>
+ POLL NON_EXISTENT NON_EXISTENT<BR>
+ Destroy Destroy<BR>
+ <BR>
+ ACK NON_EXISTENT JOINED<BR>
+ Noop Process/Ack<BR>
+ <BR>
+ JOIN JOINED NON_EXISTENT<BR>
+ Send/Join_Ack Send/Join_Ack<BR>
+ <BR>
+ LEAVE NON_EXISTENT NON_EXISTENT<BR>
+ Send/Leave_Ack Send/Leave_Ack<BR>
+ Destroy<BR>
+ <BR>
+ ACK_JOIN NON_EXISTENT NON_EXISTENT<BR>
+ Noop Destroy<BR>
+ <BR>
+ ACK_LEAVE NON_EXISTENT NON_EXISTENT<BR>
+ Noop Destroy<BR>
+ <BR>
+ SEND_DATA NON_EXISTENT NON_EXISTENT<BR>
+ Noop Destroy<BR>
+ </CODE>
+ */
enum Sender_State
{
SS_NON_EXISTENT,
SS_JOINED
};
- // State transition (and actions) for the senders.
- // This configuration is pesimistic, any invalid message is cause
- // enough to reclaim all the resources. This partially addresses
- // situations where either accidentally or intentionally a sender is
- // multicasting packets to the wrong group.
- //
- // NON_EXISTENT JOINED
- // ------------------------------------------
- // POLL NON_EXISTENT NON_EXISTENT
- // Destroy Destroy
- //
- // ACK NON_EXISTENT JOINED
- // Noop Process/Ack
- //
- // JOIN JOINED NON_EXISTENT
- // Send/Join_Ack Send/Join_Ack
- //
- // LEAVE NON_EXISTENT NON_EXISTENT
- // Send/Leave_Ack Send/Leave_Ack
- // Destroy
- //
- // ACK_JOIN NON_EXISTENT NON_EXISTENT
- // Noop Destroy
- //
- // ACK_LEAVE NON_EXISTENT NON_EXISTENT
- // Noop Destroy
- //
- // SEND_DATA NON_EXISTENT NON_EXISTENT
- // Noop Destroy
- //
-
// These structures define the basic layout of the messages.
+
+ //! This is the main message sent by senders
+ /*!
+ <CODE>
+ +---------+----------------------+<BR>
+ | 8 bits | DATA |<BR>
+ +---------+----------------------+<BR>
+ | 32 bits | sequence_number |<BR>
+ +---------+----------------------+<BR>
+ | 32 bits | message_size |<BR>
+ +---------+----------------------+<BR>
+ | 32 bits | fragment_offset |<BR>
+ +---------+----------------------+<BR>
+ ? ? ? ? ? | 32 bits | payload_size |<BR>
+ ? ? ? ? ? +---------+----------------------+<BR>
+ | | payload |<BR>
+ +---------+----------------------+<BR>
+ </CODE>
+ */
struct Data
{
// Source ID is implicit in recvfrom()...
ACE_UINT32 sequence_number;
ACE_UINT32 total_size;
ACE_UINT32 fragment_offset;
+
// @@ TODO: we may want to add optional fields, such as:
// - Polling clients for their status
// - Sending the range of messages in the queue
@@ -196,41 +178,113 @@ public:
ACE_Message_Block *payload;
+ //! Pass the proxy source between layers
ACE_RMCast_Proxy *source;
};
+ /*!
+ <CODE>
+ +---------+----------------------+<BR>
+ | 8 bits | MT_POLL |<BR>
+ +---------+----------------------+<BR>
+ </CODE>
+ */
struct Poll
{
+ //! Pass the proxy source between layers
ACE_RMCast_Proxy *source;
};
+ //! Receivers accept new members using this message
+ /*!
+ <CODE>
+ +---------+----------------------+<BR>
+ | 8 bits | MT_ACK_JOIN |<BR>
+ +---------+----------------------+<BR>
+ | 32 bits | next_sequence_number |<BR>
+ +---------+----------------------+<BR>
+ </CODE>
+ */
struct Ack_Join
{
ACE_INT32 next_sequence_number;
+ //! Pass the proxy source between layers
ACE_RMCast_Proxy *source;
};
+ //! Senders acknowledge when receivers try to leave
+ /*!
+ <CODE>
+ +---------+----------------------+<BR>
+ | 8 bits | ACK_LEAVE |<BR>
+ +---------+----------------------+<BR>
+ </CODE>
+ */
struct Ack_Leave
{
+ //! Pass the proxy source between layers
ACE_RMCast_Proxy *source;
};
+ //! Provide feedback to the sender about messages received and sent
+ //! so far.
+ /*!
+ *
+ * This message is used to provide feedback information to senders.
+ * It contains two sequence numbers:
+ * - highest_in_sequence: is the sequence number of the last message
+ * received without any lost messages before it
+ * - highest_received: is the sequence number of the last_message
+ * successfully received, there may be some messages lost before it
+ *
+ * <CODE>
+ * +---------+----------------------+<BR>
+ * | 8 bits | MT_ACK |<BR>
+ * +---------+----------------------+<BR>
+ * | 32 bits | highest_in_sequence |<BR>
+ * +---------+----------------------+<BR>
+ * | 32 bits | highest_received |<BR>
+ * +---------+----------------------+<BR>
+ * </CODE>
+ */
struct Ack
{
+ //! The last message received without any losses before it.
ACE_UINT32 highest_in_sequence;
+
+ //! The last message successfully received
ACE_UINT32 highest_received;
+ //! Pass the proxy source between layers
ACE_RMCast_Proxy *source;
};
+ //! Receivers send this message to indicate they want to join
+ /*
+ <CODE>
+ +---------+----------------------+<BR>
+ | 8 bits | MT_JOIN |<BR>
+ +---------+----------------------+<BR>
+ </CODE>
+ */
struct Join
{
+ //! Pass the proxy source between layers
ACE_RMCast_Proxy *source;
};
+ //! Receivers send this message to disconnect gracefully
+ /*!
+ <CODE>
+ +---------+----------------------+<BR>
+ | 8 bits | MT_LEAVE |<BR>
+ +---------+----------------------+<BR>
+ </CODE>
+ */
struct Leave
{
+ //! Pass the proxy source between layers
ACE_RMCast_Proxy *source;
};
};
diff --git a/protocols/ace/RMCast/RMCast_Membership.cpp b/protocols/ace/RMCast/RMCast_Membership.cpp
index 8ab58d84475..6ee2690a41f 100644
--- a/protocols/ace/RMCast/RMCast_Membership.cpp
+++ b/protocols/ace/RMCast/RMCast_Membership.cpp
@@ -18,11 +18,13 @@ ACE_RMCast_Membership::~ACE_RMCast_Membership (void)
int
ACE_RMCast_Membership::ack (ACE_RMCast::Ack &ack)
{
+ // ACE_DEBUG ((LM_DEBUG, "ACE_RMCast_Membership::ack\n"));
Proxy_Iterator end = this->proxies_.end ();
Proxy_Iterator i = this->proxies_.begin ();
if (i == end)
return 0;
+ // ACE_DEBUG ((LM_DEBUG, "ACE_RMCast_Membership::ack[2]\n"));
ACE_RMCast::Ack next_ack;
{
ACE_GUARD_RETURN (ACE_SYNCH_MUTEX, ace_mon, this->mutex_, -1);
@@ -30,11 +32,13 @@ ACE_RMCast_Membership::ack (ACE_RMCast::Ack &ack)
{
// @@ This violates an invariant of the class, shouldn't
// happen...
+ // ACE_DEBUG ((LM_DEBUG, "ACE_RMCast_Membership::ack[3]\n"));
return -1;
}
else if (ack.highest_in_sequence == this->highest_in_sequence_)
{
// Nothing new, just continue....
+ // ACE_DEBUG ((LM_DEBUG, "ACE_RMCast_Membership::ack[4]\n"));
return 0;
}
// Possible update, re-evaluate the story...
@@ -52,12 +56,15 @@ ACE_RMCast_Membership::ack (ACE_RMCast::Ack &ack)
if (r > highest_received)
highest_received = r;
}
+#if 0
if (this->highest_in_sequence_ >= highest_in_sequence
- || this->highest_received_ < highest_received)
+ || this->highest_received_ >= highest_received)
{
// No change....
+ // ACE_DEBUG ((LM_DEBUG, "ACE_RMCast_Membership::ack[5]\n"));
return 0;
}
+#endif /* 0 */
this->highest_in_sequence_ = highest_in_sequence;
this->highest_received_ = highest_received;
if (this->next () == 0)
@@ -84,10 +91,7 @@ ACE_RMCast_Membership::join (ACE_RMCast::Join &join)
return -1;
}
- if (this->next () == 0)
- return 0;
-
- return this->next ()->join (join);
+ return this->ACE_RMCast_Module::join (join);
}
int
@@ -98,18 +102,16 @@ ACE_RMCast_Membership::leave (ACE_RMCast::Leave &leave)
{
ACE_GUARD_RETURN (ACE_SYNCH_MUTEX, ace_mon, this->mutex_, -1);
- if (this->proxies_.remove (leave.source) == -1)
- return 0;
+ (void) this->proxies_.remove (leave.source);
}
- if (this->next () == 0)
- return 0;
-
- return this->next ()->leave (leave);
+ return this->ACE_RMCast_Module::leave (leave);
}
#if defined (ACE_HAS_EXPLICIT_TEMPLATE_INSTANTIATION)
template class ACE_Unbounded_Set<ACE_RMCast_Proxy*>;
+template class ACE_Unbounded_Set_Iterator<ACE_RMCast_Proxy*>;
+template class ACE_Node<ACE_RMCast_Proxy*>;
#endif /* ACE_HAS_EXPLICIT_TEMPLATE_INSTANTIATION */
diff --git a/protocols/ace/RMCast/RMCast_Module.h b/protocols/ace/RMCast/RMCast_Module.h
index 9f83c2d5be4..dc4077fa4ab 100644
--- a/protocols/ace/RMCast/RMCast_Module.h
+++ b/protocols/ace/RMCast/RMCast_Module.h
@@ -27,51 +27,66 @@
class ACE_Message_Block;
class ACE_Time_Value;
+//! Reliable Multicast Module
+/*!
+ The reliable multicast protocol is implemented as a stack of
+ "Modules" each one performing one specific task. In short, this is
+ an instance of the pipes-and-filters architectural pattern.
+*/
class ACE_RMCast_Export ACE_RMCast_Module
{
- // = TITLE
- // Reliable Multicast Module
- //
- // = DESCRIPTION
- // The reliable multicast protocol is implemented as a stack of
- // "Modules" each one performing one specific task.
- // In short, this is an instance of the pipes-and-filters
- // architectural pattern.
- //
public:
// = Initialization and termination methods.
ACE_RMCast_Module (void);
- // Constructor
+ //!< Constructor
virtual ~ACE_RMCast_Module (void);
- // Destructor
+ //!< Destructor
virtual int next (ACE_RMCast_Module *next);
+ //!< Modifier for the next element in the stack
+
virtual ACE_RMCast_Module* next (void) const;
+ //!< Accesor for the next element in the stack
+
virtual int prev (ACE_RMCast_Module *prev);
+ //!< Modifier for the previous element in the stack
+
virtual ACE_RMCast_Module* prev (void) const;
- // Modifiers and accessors for the previous and next module in the
- // stack
+ //!< Accesor for the previous element in the stack
virtual int open (void);
- // Initialize the module, setting up the next module
+ //!< Initialize the module, setting up the next module
virtual int close (void);
- // Close the module.
+ //!< Close the module.
virtual int data (ACE_RMCast::Data &);
+ //!< Push data through the stack
+
virtual int poll (ACE_RMCast::Poll &);
+ //!< Push a polling request through the stack
+
virtual int ack_join (ACE_RMCast::Ack_Join &);
+ //!< Push a message to ack a join request through the stack
+
virtual int ack_leave (ACE_RMCast::Ack_Leave &);
+ //!< Push a message to ack a leave request through the stack
+
virtual int ack (ACE_RMCast::Ack &);
+ //!< Push an ack mesage through the stack
+
virtual int join (ACE_RMCast::Join &);
+ //!< Push a join message through the stack
+
virtual int leave (ACE_RMCast::Leave &);
- // Push data down the stack
+ //!< Push a leave message through the stack
private:
+ //! The next element in the stack
ACE_RMCast_Module *next_;
+ //! The previous element in the stack
ACE_RMCast_Module *prev_;
- // The next and previous modules in the stack
};
#if defined (__ACE_INLINE__)
diff --git a/protocols/ace/RMCast/RMCast_Proxy.cpp b/protocols/ace/RMCast/RMCast_Proxy.cpp
index 2a8b2ba40b4..53d9d0b6726 100644
--- a/protocols/ace/RMCast/RMCast_Proxy.cpp
+++ b/protocols/ace/RMCast/RMCast_Proxy.cpp
@@ -25,3 +25,11 @@ ACE_RMCast_Proxy::highest_received (void) const
{
return this->highest_received_;
}
+
+int
+ACE_RMCast_Proxy::ack (ACE_RMCast::Ack &ack)
+{
+ this->highest_in_sequence_ = ack.highest_in_sequence;
+ this->highest_received_ = ack.highest_received;
+ return this->ACE_RMCast_Module::ack (ack);
+}
diff --git a/protocols/ace/RMCast/RMCast_Proxy.h b/protocols/ace/RMCast/RMCast_Proxy.h
index dca6494e374..414b74174fb 100644
--- a/protocols/ace/RMCast/RMCast_Proxy.h
+++ b/protocols/ace/RMCast/RMCast_Proxy.h
@@ -27,28 +27,48 @@
class ACE_Message_Block;
class ACE_Time_Value;
+//! Local representation for remote peers
+/*!
+ Both senders and receivers in the multicast group need to maintain
+ explicit representations of their "peers". For example, a sender
+ needs to know the list of all the receivers and what messages they
+ have reported as successfully received.
+ Likewise, the receiver needs to maintain separate state for each
+ remote sender, and must be able to disconnect from all of them
+ gracefully when needed.
+ The RMCast_Proxy class is an opaque representation of such a peer,
+ and hides all the networking details from the rest of the system.
+*/
class ACE_RMCast_Export ACE_RMCast_Proxy : public ACE_RMCast_Module
{
- // = TITLE
- // Reliable Multicast Proxy
- //
- // = DESCRIPTION
- // The proxy is used to send back messages to either a single
- // receiver (o supplier).
- //
public:
- // = Initialization and termination methods.
+ //! Constructor
ACE_RMCast_Proxy (void);
// Constructor
+ //! Destructor
virtual ~ACE_RMCast_Proxy (void);
- // Destructor
+ //! Return the highest sequence number received without any losses
+ //! before it. Only applies to remote receiver proxies.
+ /*!
+ Please read the documentation in ACE_RMCast::Ack
+ */
virtual ACE_UINT32 highest_in_sequence (void) const;
+
+ //! Return the highest sequence number successfully received.
+ //! Only applies to remote receiver proxies.
+ /*!
+ Please read the documentation in ACE_RMCast::Ack
+ */
virtual ACE_UINT32 highest_received (void) const;
- // Get the sequence numbers received by the remote proxy.
- // Return 0 for sender proxies
+ //@{
+ //! Send messages directly to the peer.
+ /*! Send a message directly to the peer, i.e. the message is not
+ sent through the multicast group and it may not be processed by
+ all the layers in the stack.
+ */
virtual int reply_data (ACE_RMCast::Data &) = 0;
virtual int reply_poll (ACE_RMCast::Poll &) = 0;
virtual int reply_ack_join (ACE_RMCast::Ack_Join &) = 0;
@@ -56,16 +76,21 @@ public:
virtual int reply_ack (ACE_RMCast::Ack &) = 0;
virtual int reply_join (ACE_RMCast::Join &) = 0;
virtual int reply_leave (ACE_RMCast::Leave &) = 0;
- // Push data back to the remote proxy
+ //@}
- // = The RMCast_Module methods
+ /*!
+ Proxies process the ACK sequence numbers to save the sequence
+ numbers reported from the remote peer.
+ */
virtual int ack (ACE_RMCast::Ack &);
private:
+ //@{
+ //! Cache the sequence numbers reported from the remote peer using
+ //! Ack messages
ACE_UINT32 highest_in_sequence_;
ACE_UINT32 highest_received_;
- // Cache the sequence numbers reported from the remote peer using
- // Ack messages
+ //@}
};
#if defined (__ACE_INLINE__)
diff --git a/protocols/ace/RMCast/RMCast_Reassembly.cpp b/protocols/ace/RMCast/RMCast_Reassembly.cpp
index ba2e9b79c1a..ed488341bae 100644
--- a/protocols/ace/RMCast/RMCast_Reassembly.cpp
+++ b/protocols/ace/RMCast/RMCast_Reassembly.cpp
@@ -22,6 +22,10 @@ ACE_RMCast_Reassembly (void)
ACE_RMCast_Reassembly::~ACE_RMCast_Reassembly (void)
{
+ /*!<
+ We cleanup the resources in the destructor
+ <B color=red>@@ TODO</B> Why not in the close() operation?
+ */
for (Message_Map_Iterator i = this->messages_.begin ();
i != this->messages_.end ();
++i)
@@ -87,6 +91,7 @@ ACE_RMCast_Reassembly::data (ACE_RMCast::Data &data)
// Push the message...
ACE_RMCast::Data downstream_data;
+ downstream_data.source = data.source;
downstream_data.sequence_number = data.sequence_number;
downstream_data.total_size = message->message_body ()->length ();
downstream_data.fragment_offset = 0;
diff --git a/protocols/ace/RMCast/RMCast_Reassembly.h b/protocols/ace/RMCast/RMCast_Reassembly.h
index 0bf0c3a61ee..6dc37e1ae19 100644
--- a/protocols/ace/RMCast/RMCast_Reassembly.h
+++ b/protocols/ace/RMCast/RMCast_Reassembly.h
@@ -24,19 +24,28 @@
class ACE_RMCast_Partial_Message;
+//! Reassemble multiple data fragments into a single data message
+/*!
+ Data messages may not fit in a single MTU in the transport layer, in
+ that case the application configure a RMCast_Fragment module on the
+ sender side. On the receiver side this layer reassemble the
+ messages sent from a <EM>single</EM> source, and passes the messages
+ up the stream.
+*/
class ACE_RMCast_Export ACE_RMCast_Reassembly : public ACE_RMCast_Module
{
public:
+ //! Constructor
ACE_RMCast_Reassembly (void);
- // Constructor
+ //! Destructor
virtual ~ACE_RMCast_Reassembly (void);
- // Destructor
// = The ACE_RMCast_Module methods
virtual int data (ACE_RMCast::Data &data);
private:
+ //! A mutex used to synchronize all the internal operations.
ACE_SYNCH_MUTEX mutex_;
typedef
ACE_Hash_Map_Manager<ACE_UINT32,ACE_RMCast_Partial_Message*,ACE_Null_Mutex>
@@ -45,8 +54,9 @@ private:
ACE_Hash_Map_Iterator<ACE_UINT32,ACE_RMCast_Partial_Message*,ACE_Null_Mutex>
Message_Map_Iterator;
+ //! A map, indexed by sequence number, of the partially received
+ //! messages.
Message_Map messages_;
- // The array of partially received messages
};
#if defined (__ACE_INLINE__)
diff --git a/protocols/ace/RMCast/RMCast_Retransmission.cpp b/protocols/ace/RMCast/RMCast_Retransmission.cpp
index b90cf0ddf31..a996e1204d5 100644
--- a/protocols/ace/RMCast/RMCast_Retransmission.cpp
+++ b/protocols/ace/RMCast/RMCast_Retransmission.cpp
@@ -96,6 +96,9 @@ ACE_RMCast_Retransmission::ack (ACE_RMCast::Ack &ack)
#if defined (ACE_HAS_EXPLICIT_TEMPLATE_INSTANTIATION)
template class ACE_RB_Tree<ACE_UINT32,ACE_RMCast::Data,ACE_Less_Than<ACE_UINT32>,ACE_Null_Mutex>;
+template class ACE_RB_Tree_Iterator_Base<ACE_UINT32,ACE_RMCast::Data,ACE_Less_Than<ACE_UINT32>,ACE_Null_Mutex>;
template class ACE_RB_Tree_Iterator<ACE_UINT32,ACE_RMCast::Data,ACE_Less_Than<ACE_UINT32>,ACE_Null_Mutex>;
+template class ACE_RB_Tree_Reverse_Iterator<ACE_UINT32,ACE_RMCast::Data,ACE_Less_Than<ACE_UINT32>,ACE_Null_Mutex>;
+template class ACE_RB_Tree_Node<ACE_UINT32,ACE_RMCast::Data>;
#endif /* ACE_HAS_EXPLICIT_TEMPLATE_INSTANTIATION */
diff --git a/tests/RMCast/Makefile b/tests/RMCast/Makefile
index 38dbce49c4d..c1949350a25 100644
--- a/tests/RMCast/Makefile
+++ b/tests/RMCast/Makefile
@@ -11,6 +11,7 @@
BIN = RMCast_Fragment_Test \
RMCast_Reassembly_Test \
RMCast_UDP_Best_Effort_Test \
+ RMCast_Membership_Test
PSRC=$(addsuffix .cpp,$(BIN))
LDLIBS = -lACE_RMCast
@@ -71,16 +72,16 @@ endif
$(ACE_ROOT)/ace/Synch.h \
$(ACE_ROOT)/ace/ACE.h \
$(ACE_ROOT)/ace/ACE.i \
- $(ACE_ROOT)/ace/SV_Semaphore_Complex.h \
- $(ACE_ROOT)/ace/SV_Semaphore_Simple.h \
- $(ACE_ROOT)/ace/SV_Semaphore_Simple.i \
- $(ACE_ROOT)/ace/SV_Semaphore_Complex.i \
$(ACE_ROOT)/ace/Synch.i \
$(ACE_ROOT)/ace/Synch_T.h \
$(ACE_ROOT)/ace/Event_Handler.h \
$(ACE_ROOT)/ace/Event_Handler.i \
$(ACE_ROOT)/ace/Synch_T.i \
$(ACE_ROOT)/ace/Thread.h \
+ $(ACE_ROOT)/ace/Thread_Adapter.h \
+ $(ACE_ROOT)/ace/Base_Thread_Adapter.h \
+ $(ACE_ROOT)/ace/Base_Thread_Adapter.inl \
+ $(ACE_ROOT)/ace/Thread_Adapter.inl \
$(ACE_ROOT)/ace/Thread.i \
$(ACE_ROOT)/ace/Atomic_Op.i \
$(ACE_ROOT)/ace/Synch_T.cpp \
@@ -124,6 +125,10 @@ endif
$(ACE_ROOT)/ace/Signal.i \
$(ACE_ROOT)/ace/Mem_Map.h \
$(ACE_ROOT)/ace/Mem_Map.i \
+ $(ACE_ROOT)/ace/SV_Semaphore_Complex.h \
+ $(ACE_ROOT)/ace/SV_Semaphore_Simple.h \
+ $(ACE_ROOT)/ace/SV_Semaphore_Simple.i \
+ $(ACE_ROOT)/ace/SV_Semaphore_Complex.i \
$(ACE_ROOT)/ace/Memory_Pool.i \
$(ACE_ROOT)/ace/Thread_Manager.i \
$(ACE_ROOT)/ace/Task.i \
@@ -172,6 +177,8 @@ endif
$(ACE_ROOT)/ace/Service_Types.i \
$(ACE_ROOT)/ace/Service_Repository.i \
$(ACE_ROOT)/ace/WFMO_Reactor.h \
+ $(ACE_ROOT)/ace/Process_Mutex.h \
+ $(ACE_ROOT)/ace/Process_Mutex.inl \
$(ACE_ROOT)/ace/WFMO_Reactor.i \
$(ACE_ROOT)/ace/Strategies.i \
$(ACE_ROOT)/ace/Message_Queue.i \
@@ -216,16 +223,16 @@ endif
$(ACE_ROOT)/ace/Synch.h \
$(ACE_ROOT)/ace/ACE.h \
$(ACE_ROOT)/ace/ACE.i \
- $(ACE_ROOT)/ace/SV_Semaphore_Complex.h \
- $(ACE_ROOT)/ace/SV_Semaphore_Simple.h \
- $(ACE_ROOT)/ace/SV_Semaphore_Simple.i \
- $(ACE_ROOT)/ace/SV_Semaphore_Complex.i \
$(ACE_ROOT)/ace/Synch.i \
$(ACE_ROOT)/ace/Synch_T.h \
$(ACE_ROOT)/ace/Event_Handler.h \
$(ACE_ROOT)/ace/Event_Handler.i \
$(ACE_ROOT)/ace/Synch_T.i \
$(ACE_ROOT)/ace/Thread.h \
+ $(ACE_ROOT)/ace/Thread_Adapter.h \
+ $(ACE_ROOT)/ace/Base_Thread_Adapter.h \
+ $(ACE_ROOT)/ace/Base_Thread_Adapter.inl \
+ $(ACE_ROOT)/ace/Thread_Adapter.inl \
$(ACE_ROOT)/ace/Thread.i \
$(ACE_ROOT)/ace/Atomic_Op.i \
$(ACE_ROOT)/ace/Synch_T.cpp \
@@ -269,6 +276,10 @@ endif
$(ACE_ROOT)/ace/Signal.i \
$(ACE_ROOT)/ace/Mem_Map.h \
$(ACE_ROOT)/ace/Mem_Map.i \
+ $(ACE_ROOT)/ace/SV_Semaphore_Complex.h \
+ $(ACE_ROOT)/ace/SV_Semaphore_Simple.h \
+ $(ACE_ROOT)/ace/SV_Semaphore_Simple.i \
+ $(ACE_ROOT)/ace/SV_Semaphore_Complex.i \
$(ACE_ROOT)/ace/Memory_Pool.i \
$(ACE_ROOT)/ace/Thread_Manager.i \
$(ACE_ROOT)/ace/Task.i \
@@ -317,6 +328,8 @@ endif
$(ACE_ROOT)/ace/Service_Types.i \
$(ACE_ROOT)/ace/Service_Repository.i \
$(ACE_ROOT)/ace/WFMO_Reactor.h \
+ $(ACE_ROOT)/ace/Process_Mutex.h \
+ $(ACE_ROOT)/ace/Process_Mutex.inl \
$(ACE_ROOT)/ace/WFMO_Reactor.i \
$(ACE_ROOT)/ace/Strategies.i \
$(ACE_ROOT)/ace/Message_Queue.i \
@@ -362,16 +375,16 @@ endif
$(ACE_ROOT)/ace/Synch.h \
$(ACE_ROOT)/ace/ACE.h \
$(ACE_ROOT)/ace/ACE.i \
- $(ACE_ROOT)/ace/SV_Semaphore_Complex.h \
- $(ACE_ROOT)/ace/SV_Semaphore_Simple.h \
- $(ACE_ROOT)/ace/SV_Semaphore_Simple.i \
- $(ACE_ROOT)/ace/SV_Semaphore_Complex.i \
$(ACE_ROOT)/ace/Synch.i \
$(ACE_ROOT)/ace/Synch_T.h \
$(ACE_ROOT)/ace/Event_Handler.h \
$(ACE_ROOT)/ace/Event_Handler.i \
$(ACE_ROOT)/ace/Synch_T.i \
$(ACE_ROOT)/ace/Thread.h \
+ $(ACE_ROOT)/ace/Thread_Adapter.h \
+ $(ACE_ROOT)/ace/Base_Thread_Adapter.h \
+ $(ACE_ROOT)/ace/Base_Thread_Adapter.inl \
+ $(ACE_ROOT)/ace/Thread_Adapter.inl \
$(ACE_ROOT)/ace/Thread.i \
$(ACE_ROOT)/ace/Atomic_Op.i \
$(ACE_ROOT)/ace/Synch_T.cpp \
@@ -443,6 +456,10 @@ endif
$(ACE_ROOT)/ace/Memory_Pool.h \
$(ACE_ROOT)/ace/Mem_Map.h \
$(ACE_ROOT)/ace/Mem_Map.i \
+ $(ACE_ROOT)/ace/SV_Semaphore_Complex.h \
+ $(ACE_ROOT)/ace/SV_Semaphore_Simple.h \
+ $(ACE_ROOT)/ace/SV_Semaphore_Simple.i \
+ $(ACE_ROOT)/ace/SV_Semaphore_Complex.i \
$(ACE_ROOT)/ace/Memory_Pool.i \
$(ACE_ROOT)/ace/Signal.i \
$(ACE_ROOT)/ace/SString.h \
@@ -491,6 +508,161 @@ endif
$(ACE_ROOT)/ace/Service_Types.i \
$(ACE_ROOT)/ace/Service_Repository.i \
$(ACE_ROOT)/ace/WFMO_Reactor.h \
+ $(ACE_ROOT)/ace/Process_Mutex.h \
+ $(ACE_ROOT)/ace/Process_Mutex.inl \
+ $(ACE_ROOT)/ace/WFMO_Reactor.i \
+ $(ACE_ROOT)/ace/Strategies.i \
+ $(ACE_ROOT)/ace/Message_Queue.i \
+ $(ACE_ROOT)/ace/Task_T.i \
+ $(ACE_ROOT)/ace/Task_T.cpp \
+ $(ACE_ROOT)/ace/Module.h \
+ $(ACE_ROOT)/ace/Module.i \
+ $(ACE_ROOT)/ace/Module.cpp \
+ $(ACE_ROOT)/ace/Stream_Modules.h \
+ $(ACE_ROOT)/ace/Stream_Modules.cpp
+
+.obj/RMCast_Membership_Test.o .obj/RMCast_Membership_Test.so .shobj/RMCast_Membership_Test.o .shobj/RMCast_Membership_Test.so: RMCast_Membership_Test.cpp ../test_config.h \
+ $(ACE_ROOT)/ace/pre.h \
+ $(ACE_ROOT)/ace/post.h \
+ $(ACE_ROOT)/ace/ACE_export.h \
+ $(ACE_ROOT)/ace/svc_export.h \
+ $(ACE_ROOT)/ace/ace_wchar.h \
+ $(ACE_ROOT)/ace/OS.h \
+ $(ACE_ROOT)/ace/OS_Dirent.h \
+ $(ACE_ROOT)/ace/OS_Export.h \
+ $(ACE_ROOT)/ace/OS_Dirent.inl \
+ $(ACE_ROOT)/ace/OS_String.h \
+ $(ACE_ROOT)/ace/OS_String.inl \
+ $(ACE_ROOT)/ace/OS_Memory.h \
+ $(ACE_ROOT)/ace/OS_Memory.inl \
+ $(ACE_ROOT)/ace/OS_TLI.h \
+ $(ACE_ROOT)/ace/OS_TLI.inl \
+ $(ACE_ROOT)/ace/Min_Max.h \
+ $(ACE_ROOT)/ace/streams.h \
+ $(ACE_ROOT)/ace/Basic_Types.h \
+ $(ACE_ROOT)/ace/Basic_Types.i \
+ $(ACE_ROOT)/ace/Trace.h \
+ $(ACE_ROOT)/ace/OS.i \
+ $(ACE_ROOT)/ace/Singleton.h \
+ $(ACE_ROOT)/ace/Synch.h \
+ $(ACE_ROOT)/ace/ACE.h \
+ $(ACE_ROOT)/ace/ACE.i \
+ $(ACE_ROOT)/ace/Synch.i \
+ $(ACE_ROOT)/ace/Synch_T.h \
+ $(ACE_ROOT)/ace/Event_Handler.h \
+ $(ACE_ROOT)/ace/Event_Handler.i \
+ $(ACE_ROOT)/ace/Synch_T.i \
+ $(ACE_ROOT)/ace/Thread.h \
+ $(ACE_ROOT)/ace/Thread_Adapter.h \
+ $(ACE_ROOT)/ace/Base_Thread_Adapter.h \
+ $(ACE_ROOT)/ace/Base_Thread_Adapter.inl \
+ $(ACE_ROOT)/ace/Thread_Adapter.inl \
+ $(ACE_ROOT)/ace/Thread.i \
+ $(ACE_ROOT)/ace/Atomic_Op.i \
+ $(ACE_ROOT)/ace/Synch_T.cpp \
+ $(ACE_ROOT)/ace/Log_Msg.h \
+ $(ACE_ROOT)/ace/Log_Record.h \
+ $(ACE_ROOT)/ace/Log_Priority.h \
+ $(ACE_ROOT)/ace/Log_Record.i \
+ $(ACE_ROOT)/ace/Singleton.i \
+ $(ACE_ROOT)/ace/Singleton.cpp \
+ $(ACE_ROOT)/ace/Object_Manager.h \
+ $(ACE_ROOT)/ace/Object_Manager.i \
+ $(ACE_ROOT)/ace/Managed_Object.h \
+ $(ACE_ROOT)/ace/Managed_Object.i \
+ $(ACE_ROOT)/ace/Managed_Object.cpp \
+ $(ACE_ROOT)/ace/RMCast/RMCast_Proxy.h \
+ $(ACE_ROOT)/ace/RMCast/RMCast_Module.h \
+ $(ACE_ROOT)/ace/RMCast/RMCast.h \
+ $(ACE_ROOT)/ace/RMCast/RMCast_Export.h \
+ $(ACE_ROOT)/ace/RMCast/RMCast.i \
+ $(ACE_ROOT)/ace/RMCast/RMCast_Module.i \
+ $(ACE_ROOT)/ace/RMCast/RMCast_Proxy.i \
+ $(ACE_ROOT)/ace/RMCast/RMCast_Membership.h \
+ $(ACE_ROOT)/ace/Containers.h \
+ $(ACE_ROOT)/ace/Malloc_Base.h \
+ $(ACE_ROOT)/ace/Containers.i \
+ $(ACE_ROOT)/ace/Containers_T.h \
+ $(ACE_ROOT)/ace/Containers_T.i \
+ $(ACE_ROOT)/ace/Containers_T.cpp \
+ $(ACE_ROOT)/ace/Malloc.h \
+ $(ACE_ROOT)/ace/Based_Pointer_T.h \
+ $(ACE_ROOT)/ace/Based_Pointer_T.i \
+ $(ACE_ROOT)/ace/Based_Pointer_T.cpp \
+ $(ACE_ROOT)/ace/Based_Pointer_Repository.h \
+ $(ACE_ROOT)/ace/Malloc.i \
+ $(ACE_ROOT)/ace/Malloc_T.h \
+ $(ACE_ROOT)/ace/Free_List.h \
+ $(ACE_ROOT)/ace/Free_List.i \
+ $(ACE_ROOT)/ace/Free_List.cpp \
+ $(ACE_ROOT)/ace/Malloc_T.i \
+ $(ACE_ROOT)/ace/Malloc_T.cpp \
+ $(ACE_ROOT)/ace/Memory_Pool.h \
+ $(ACE_ROOT)/ace/Signal.h \
+ $(ACE_ROOT)/ace/Signal.i \
+ $(ACE_ROOT)/ace/Mem_Map.h \
+ $(ACE_ROOT)/ace/Mem_Map.i \
+ $(ACE_ROOT)/ace/SV_Semaphore_Complex.h \
+ $(ACE_ROOT)/ace/SV_Semaphore_Simple.h \
+ $(ACE_ROOT)/ace/SV_Semaphore_Simple.i \
+ $(ACE_ROOT)/ace/SV_Semaphore_Complex.i \
+ $(ACE_ROOT)/ace/Memory_Pool.i \
+ $(ACE_ROOT)/ace/RMCast/RMCast_Membership.i \
+ $(ACE_ROOT)/ace/Task.h \
+ $(ACE_ROOT)/ace/Service_Object.h \
+ $(ACE_ROOT)/ace/Shared_Object.h \
+ $(ACE_ROOT)/ace/Shared_Object.i \
+ $(ACE_ROOT)/ace/Service_Object.i \
+ $(ACE_ROOT)/ace/Thread_Manager.h \
+ $(ACE_ROOT)/ace/Thread_Manager.i \
+ $(ACE_ROOT)/ace/Task.i \
+ $(ACE_ROOT)/ace/Task_T.h \
+ $(ACE_ROOT)/ace/Message_Queue.h \
+ $(ACE_ROOT)/ace/Message_Block.h \
+ $(ACE_ROOT)/ace/Message_Block.i \
+ $(ACE_ROOT)/ace/Message_Block_T.h \
+ $(ACE_ROOT)/ace/Message_Block_T.i \
+ $(ACE_ROOT)/ace/Message_Block_T.cpp \
+ $(ACE_ROOT)/ace/IO_Cntl_Msg.h \
+ $(ACE_ROOT)/ace/Message_Queue_T.h \
+ $(ACE_ROOT)/ace/Message_Queue_T.i \
+ $(ACE_ROOT)/ace/Message_Queue_T.cpp \
+ $(ACE_ROOT)/ace/Strategies.h \
+ $(ACE_ROOT)/ace/Strategies_T.h \
+ $(ACE_ROOT)/ace/Service_Config.h \
+ $(ACE_ROOT)/ace/SString.h \
+ $(ACE_ROOT)/ace/SString.i \
+ $(ACE_ROOT)/ace/Service_Config.i \
+ $(ACE_ROOT)/ace/Reactor.h \
+ $(ACE_ROOT)/ace/Handle_Set.h \
+ $(ACE_ROOT)/ace/Handle_Set.i \
+ $(ACE_ROOT)/ace/Timer_Queue.h \
+ $(ACE_ROOT)/ace/Timer_Queue_T.h \
+ $(ACE_ROOT)/ace/Timer_Queue_T.i \
+ $(ACE_ROOT)/ace/Timer_Queue_T.cpp \
+ $(ACE_ROOT)/ace/Reactor.i \
+ $(ACE_ROOT)/ace/Reactor_Impl.h \
+ $(ACE_ROOT)/ace/Svc_Conf_Tokens.h \
+ $(ACE_ROOT)/ace/Synch_Options.h \
+ $(ACE_ROOT)/ace/Synch_Options.i \
+ $(ACE_ROOT)/ace/Hash_Map_Manager.h \
+ $(ACE_ROOT)/ace/Functor.h \
+ $(ACE_ROOT)/ace/Functor.i \
+ $(ACE_ROOT)/ace/Functor_T.h \
+ $(ACE_ROOT)/ace/Functor_T.i \
+ $(ACE_ROOT)/ace/Functor_T.cpp \
+ $(ACE_ROOT)/ace/Hash_Map_Manager_T.h \
+ $(ACE_ROOT)/ace/Hash_Map_Manager_T.i \
+ $(ACE_ROOT)/ace/Hash_Map_Manager_T.cpp \
+ $(ACE_ROOT)/ace/Strategies_T.i \
+ $(ACE_ROOT)/ace/Strategies_T.cpp \
+ $(ACE_ROOT)/ace/Service_Repository.h \
+ $(ACE_ROOT)/ace/Service_Types.h \
+ $(ACE_ROOT)/ace/Service_Types.i \
+ $(ACE_ROOT)/ace/Service_Repository.i \
+ $(ACE_ROOT)/ace/WFMO_Reactor.h \
+ $(ACE_ROOT)/ace/Process_Mutex.h \
+ $(ACE_ROOT)/ace/Process_Mutex.inl \
$(ACE_ROOT)/ace/WFMO_Reactor.i \
$(ACE_ROOT)/ace/Strategies.i \
$(ACE_ROOT)/ace/Message_Queue.i \
diff --git a/tests/RMCast/RMCast_Membership_Test.cpp b/tests/RMCast/RMCast_Membership_Test.cpp
new file mode 100644
index 00000000000..54b5f9a5aa8
--- /dev/null
+++ b/tests/RMCast/RMCast_Membership_Test.cpp
@@ -0,0 +1,449 @@
+// $Id$
+
+// ============================================================================
+//
+// = DESCRIPTION
+// Unit test for the UDP sending module of the RMCast library.
+//
+// = AUTHORS
+// Carlos O'Ryan <coryan@uci.edu>
+//
+// ============================================================================
+
+#include "test_config.h"
+#include "ace/RMCast/RMCast_Proxy.h"
+#include "ace/RMCast/RMCast_Membership.h"
+
+#include "ace/Task.h"
+
+ACE_RCSID(tests, RMCast_Membership_Test, "$Id$")
+
+const size_t message_size = 8 * 1024;
+const int total_message_count = 40;
+
+// ****************************************************************
+
+//! Simple proxy for the ACE_RMCast_Membership test harness
+/*!
+ * Implement a simple version of the ACE_RMCast_Proxy class used in
+ * the ACE_RMCast_Membership test harness.
+ */
+class Test_Proxy : public ACE_RMCast_Proxy
+{
+public:
+ Test_Proxy (void);
+
+ //! Get the flag to remember if this proxy has joined the group or
+ //! not.
+ int joined (void) const
+ {
+ return this->joined_;
+ }
+ //! Set the flag to remember if this proxy has joined the group or
+ //! not.
+ void joined (int j)
+ {
+ this->joined_ = j;
+ }
+
+ //@{
+ //! All the reply_ methods just return 0, there is no real remote
+ //! peer, this is just a test harness
+ virtual int reply_data (ACE_RMCast::Data &)
+ {
+ return 0;
+ }
+ virtual int reply_poll (ACE_RMCast::Poll &)
+ {
+ return 0;
+ }
+ virtual int reply_ack_join (ACE_RMCast::Ack_Join &)
+ {
+ return 0;
+ }
+ virtual int reply_ack_leave (ACE_RMCast::Ack_Leave &)
+ {
+ return 0;
+ }
+ virtual int reply_ack (ACE_RMCast::Ack &)
+ {
+ return 0;
+ }
+ virtual int reply_join (ACE_RMCast::Join &)
+ {
+ return 0;
+ }
+ virtual int reply_leave (ACE_RMCast::Leave &)
+ {
+ return 0;
+ }
+ //@}
+
+private:
+ //! Remember if we joined the group already.
+ int joined_;
+};
+
+// ****************************************************************
+
+//! The number of proxies used in the test
+/*!
+ * Not all member will be present in the group at the same time. But
+ * this variable controls the maximum number
+ */
+const size_t nproxy = 16;
+
+//! A simple module to receive the messages from ACE_RMCast_Membership
+/*!
+ * The ACE_RMCast_Membership layer pushes messages to its next module
+ * when all the members have acked a message, when a new member joins,
+ * when a member leaves, etc.
+ * This class will verify that the messages are exactly what we
+ * expect.
+ */
+class Tester : public ACE_RMCast_Module
+{
+public:
+ Tester (void);
+
+ //! Run the test for \iterations times
+ void run (int iterations);
+
+ virtual int join (ACE_RMCast::Join &join);
+ virtual int leave (ACE_RMCast::Leave &leave);
+ virtual int ack (ACE_RMCast::Ack &ack);
+
+private:
+ //! Add a few proxies to the group
+ void join_random (void);
+
+ //! Remove a few proxies from the group
+ void leave_random (void);
+
+ //! Generate a few ack messages from all the proxies currently in
+ //! the group
+ void generate_acks (int iterations);
+
+private:
+ //! The array of proxies
+ Test_Proxy proxy_[nproxy];
+
+ //! The Membership layer
+ ACE_RMCast_Membership membership_;
+
+ //! Synchronize internal data structures
+ ACE_SYNCH_MUTEX lock_;
+
+ //! The test is randomized to get better coverage. This is the seed
+ //! variable for the test
+ ACE_RANDR_TYPE seed_;
+};
+
+// ****************************************************************
+
+//! An Adapter to run Tester::run the test is a separate thread
+class Task : public ACE_Task_Base
+{
+public:
+ Task (Tester *tester);
+
+ // = Read the documentation in "ace/Task.h"
+ int svc (void);
+
+private:
+ //! The tester object.
+ Tester *tester_;
+};
+
+// ****************************************************************
+
+int
+main (int, ACE_TCHAR *[])
+{
+ ACE_START_TEST (ACE_TEXT ("RMCast_Membership_Test"));
+
+ ACE_DEBUG ((LM_DEBUG,
+ ACE_TEXT ("This is ACE Version %u.%u.%u\n\n"),
+ ACE::major_version(),
+ ACE::minor_version(),
+ ACE::beta_version()));
+
+ {
+ ACE_DEBUG ((LM_DEBUG, "Running single threaded test\n"));
+ //! Run the test in single threaded mode
+ Tester tester;
+ tester.run (100);
+ }
+ {
+ ACE_DEBUG ((LM_DEBUG, "Running multi threaded test\n"));
+ //! Run the test in multi-threaded mode
+ Tester tester;
+ Task task (&tester);
+ if (task.activate (THR_NEW_LWP|THR_JOINABLE, 4) == -1)
+ ACE_ERROR_RETURN ((LM_ERROR, "Cannot activate the threads\n"), 1);
+ ACE_Thread_Manager::instance ()->wait ();
+ }
+
+ ACE_END_TEST;
+ return 0;
+}
+
+// ****************************************************************
+
+Test_Proxy::Test_Proxy (void)
+ : joined_ (0)
+{
+}
+
+// ****************************************************************
+
+Tester::Tester (void)
+ : seed_ (ACE_OS::gethrtime ())
+{
+ // Initialize the stack...
+ this->membership_.next (this);
+ for (size_t i = 0; i != nproxy; ++i)
+ this->proxy_[i].next (&this->membership_);
+}
+
+void
+Tester::run (int iterations)
+{
+ for (int i = 0; i < iterations; ++i)
+ {
+ // Connect a few....
+ this->join_random ();
+
+ // Push acks....
+ this->generate_acks (iterations);
+
+ // Disconnect a few
+ this->leave_random ();
+
+ // Push acks...
+ this->generate_acks (iterations / 10);
+ }
+}
+
+int
+Tester::join (ACE_RMCast::Join &join)
+{
+ if (join.source == 0)
+ {
+ ACE_ERROR_RETURN ((LM_ERROR,
+ "Invalid join message in Tester\n"),
+ -1);
+ }
+ for (size_t i = 0; i != nproxy; ++i)
+ {
+ if (&this->proxy_[i] != join.source)
+ continue;
+ if (this->proxy_[i].joined () != 1)
+ ACE_ERROR_RETURN ((LM_ERROR,
+ "Invalid state for proxy %d "
+ "in Tester::join\n"),
+ -1);
+ return 0;
+ }
+ // Not found
+ ACE_ERROR_RETURN ((LM_ERROR, "Unknown proxy in Tester::join\n"), -1);
+}
+
+int
+Tester::leave (ACE_RMCast::Leave &leave)
+{
+ if (leave.source == 0)
+ {
+ ACE_ERROR_RETURN ((LM_ERROR,
+ "Invalid leave message in Tester\n"),
+ -1);
+ }
+ for (size_t i = 0; i != nproxy; ++i)
+ {
+ if (&this->proxy_[i] != leave.source)
+ continue;
+ if (this->proxy_[i].joined () != 0)
+ ACE_ERROR_RETURN ((LM_ERROR,
+ "Invalid state for proxy %d "
+ "in Tester::leave\n"),
+ -1);
+ return 0;
+ }
+ // Not found
+ ACE_ERROR_RETURN ((LM_ERROR, "Unknown proxy in Tester::leave\n"), -1);
+}
+
+int
+Tester::ack (ACE_RMCast::Ack &ack)
+{
+ // After the membership layer the source makes no sense....
+ if (ack.source == 0)
+ {
+ ACE_ERROR_RETURN ((LM_ERROR,
+ "Invalid ack message in Tester\n"),
+ -1);
+ }
+
+ // ACE_DEBUG ((LM_DEBUG,
+ // "Received ack in Tester %d,%d\n",
+ // ack.highest_in_sequence,
+ // ack.highest_received));
+
+ // Assume the lock is held, verify that the ack message satisfy the
+ // invariants...
+ ACE_UINT32 highest_in_sequence;
+ ACE_UINT32 highest_received;
+ int set = 0;
+ for (size_t i = 0; i != nproxy; ++i)
+ {
+ if (!this->proxy_[i].joined ())
+ continue;
+ if (!set)
+ {
+ set = 1;
+ highest_in_sequence = this->proxy_[i].highest_in_sequence ();
+ highest_received = this->proxy_[i].highest_received ();
+ }
+ else
+ {
+ if (highest_in_sequence >
+ this->proxy_[i].highest_in_sequence ())
+ {
+ highest_in_sequence =
+ this->proxy_[i].highest_in_sequence ();
+ }
+ if (highest_received <
+ this->proxy_[i].highest_received ())
+ {
+ highest_received =
+ this->proxy_[i].highest_received ();
+ }
+ }
+ }
+ // No local proxy just return...
+ if (!set)
+ return 0;
+
+ // Check the invariants
+ if (ack.highest_in_sequence != highest_in_sequence)
+ {
+ ACE_ERROR_RETURN ((LM_ERROR,
+ "Invalid highest_in_sequence in Ack\n"),
+ -1);
+ }
+ if (ack.highest_received != highest_received)
+ {
+ ACE_ERROR_RETURN ((LM_ERROR,
+ "Invalid highest_received in Ack\n"),
+ -1);
+ }
+ return 0;
+}
+
+void
+Tester::join_random (void)
+{
+ for (size_t i = 0; i != nproxy; ++i)
+ {
+ ACE_GUARD (ACE_SYNCH_MUTEX, ace_mon, this->lock_);
+ int r = ACE_OS::rand_r (this->seed_) % 100;
+ if (this->proxy_[i].joined () == 0 && r > 25)
+ {
+ this->proxy_[i].joined (1);
+
+ ACE_RMCast::Join join;
+ join.source = &this->proxy_[i];
+ // ACE_DEBUG ((LM_DEBUG,
+ // "Sending join mesage for proxy %d\n",
+ // i));
+ this->proxy_[i].join (join);
+ }
+ }
+}
+
+void
+Tester::leave_random (void)
+{
+ for (size_t i = 0; i != nproxy; ++i)
+ {
+ ACE_GUARD (ACE_SYNCH_MUTEX, ace_mon, this->lock_);
+ int r = ACE_OS::rand_r (this->seed_) % 100;
+ if (this->proxy_[i].joined () == 1 && r > 75)
+ {
+ this->proxy_[i].joined (0);
+
+ ACE_RMCast::Leave leave;
+ leave.source = &this->proxy_[i];
+ // ACE_DEBUG ((LM_DEBUG,
+ // "Sending leave mesage for proxy %d\n",
+ // i));
+ this->proxy_[i].leave (leave);
+ }
+ }
+}
+
+void
+Tester::generate_acks (int iterations)
+{
+ int n = 0;
+ for (size_t i = 0; n < iterations && i != nproxy; ++i, ++n)
+ {
+ ACE_GUARD (ACE_SYNCH_MUTEX, ace_mon, this->lock_);
+ int r = ACE_OS::rand_r (this->seed_) % 10;
+ if (this->proxy_[i].joined () == 0)
+ continue;
+
+ ACE_RMCast::Ack ack;
+ ack.source = &this->proxy_[i];
+ ack.highest_in_sequence =
+ this->proxy_[i].highest_in_sequence ();
+ ack.highest_received =
+ this->proxy_[i].highest_received ();
+
+ // we randomly perform one of 3 actions, with different
+ // probabilities
+ switch (r)
+ {
+ case 0:
+ // Ack the same data that we already have:
+ break;
+ case 1:
+ case 2:
+ case 3:
+ case 4:
+ // Simulate out of sequence messages...
+ ack.highest_received++;
+ break;
+ default:
+ if (ack.highest_received > ack.highest_in_sequence)
+ ack.highest_in_sequence++;
+ break;
+ }
+ // ACE_DEBUG ((LM_DEBUG,
+ // "Sending ack message (%d,%d) through proxy %d\n",
+ // ack.highest_in_sequence,
+ // ack.highest_received,
+ // i));
+ int result = this->proxy_[i].ack (ack);
+ if (result != 0)
+ {
+ ACE_ERROR ((LM_ERROR,
+ "Returned %d in proxy %d\n",
+ result, i));
+ }
+ }
+}
+
+// ****************************************************************
+
+Task::Task (Tester *tester)
+ : tester_ (tester)
+{
+}
+
+int
+Task::svc (void)
+{
+ this->tester_->run (100);
+ return 0;
+}