diff options
-rw-r--r-- | ChangeLog | 22 | ||||
-rw-r--r-- | ChangeLogs/ChangeLog-02a | 22 | ||||
-rw-r--r-- | ChangeLogs/ChangeLog-03a | 22 | ||||
-rw-r--r-- | ace/RMCast/Makefile | 6 | ||||
-rw-r--r-- | ace/RMCast/RMCast.h | 292 | ||||
-rw-r--r-- | ace/RMCast/RMCast_Membership.cpp | 24 | ||||
-rw-r--r-- | ace/RMCast/RMCast_Module.h | 49 | ||||
-rw-r--r-- | ace/RMCast/RMCast_Proxy.cpp | 8 | ||||
-rw-r--r-- | ace/RMCast/RMCast_Proxy.h | 55 | ||||
-rw-r--r-- | ace/RMCast/RMCast_Reassembly.cpp | 5 | ||||
-rw-r--r-- | ace/RMCast/RMCast_Reassembly.h | 16 | ||||
-rw-r--r-- | ace/RMCast/RMCast_Retransmission.cpp | 3 | ||||
-rw-r--r-- | protocols/ace/RMCast/Makefile | 6 | ||||
-rw-r--r-- | protocols/ace/RMCast/RMCast.h | 292 | ||||
-rw-r--r-- | protocols/ace/RMCast/RMCast_Membership.cpp | 24 | ||||
-rw-r--r-- | protocols/ace/RMCast/RMCast_Module.h | 49 | ||||
-rw-r--r-- | protocols/ace/RMCast/RMCast_Proxy.cpp | 8 | ||||
-rw-r--r-- | protocols/ace/RMCast/RMCast_Proxy.h | 55 | ||||
-rw-r--r-- | protocols/ace/RMCast/RMCast_Reassembly.cpp | 5 | ||||
-rw-r--r-- | protocols/ace/RMCast/RMCast_Reassembly.h | 16 | ||||
-rw-r--r-- | protocols/ace/RMCast/RMCast_Retransmission.cpp | 3 | ||||
-rw-r--r-- | tests/RMCast/Makefile | 196 | ||||
-rw-r--r-- | tests/RMCast/RMCast_Membership_Test.cpp | 449 |
23 files changed, 1281 insertions, 346 deletions
diff --git a/ChangeLog b/ChangeLog index a4b4d34f6a8..b77525d55c1 100644 --- a/ChangeLog +++ b/ChangeLog @@ -1,3 +1,25 @@ +Wed Sep 27 08:23:58 2000 Carlos O'Ryan <coryan@uci.edu> + + * ace/RMCast/Makefile: + Updated dependencies + + * ace/RMCast/RMCast.h: + * ace/RMCast/RMCast_Module.h: + * ace/RMCast/RMCast_Proxy.cpp: + * ace/RMCast/RMCast_Proxy.h: + * ace/RMCast/RMCast_Reassembly.h: + * ace/RMCast/RMCast_Reassembly.cpp: + * ace/RMCast/RMCast_Retransmission.cpp: + Update comments to be doxygen friendly + + * ace/RMCast/RMCast_Membership.cpp: + Fixed problems in Ack management, we were stopping the useful + Acks, not the ones that just represented repeated information. + + * tests/RMCast/Makefile: + * tests/RMCast/RMCast_Membership_Test.cpp: + Add test for the ACE_RMCast_Membership class. + Tue Sep 26 22:39:42 2000 Darrell Brunsch <brunsch@uci.edu> * examples/Reactor/WFMO_Reactor/test_abandoned.cpp: diff --git a/ChangeLogs/ChangeLog-02a b/ChangeLogs/ChangeLog-02a index a4b4d34f6a8..b77525d55c1 100644 --- a/ChangeLogs/ChangeLog-02a +++ b/ChangeLogs/ChangeLog-02a @@ -1,3 +1,25 @@ +Wed Sep 27 08:23:58 2000 Carlos O'Ryan <coryan@uci.edu> + + * ace/RMCast/Makefile: + Updated dependencies + + * ace/RMCast/RMCast.h: + * ace/RMCast/RMCast_Module.h: + * ace/RMCast/RMCast_Proxy.cpp: + * ace/RMCast/RMCast_Proxy.h: + * ace/RMCast/RMCast_Reassembly.h: + * ace/RMCast/RMCast_Reassembly.cpp: + * ace/RMCast/RMCast_Retransmission.cpp: + Update comments to be doxygen friendly + + * ace/RMCast/RMCast_Membership.cpp: + Fixed problems in Ack management, we were stopping the useful + Acks, not the ones that just represented repeated information. + + * tests/RMCast/Makefile: + * tests/RMCast/RMCast_Membership_Test.cpp: + Add test for the ACE_RMCast_Membership class. + Tue Sep 26 22:39:42 2000 Darrell Brunsch <brunsch@uci.edu> * examples/Reactor/WFMO_Reactor/test_abandoned.cpp: diff --git a/ChangeLogs/ChangeLog-03a b/ChangeLogs/ChangeLog-03a index a4b4d34f6a8..b77525d55c1 100644 --- a/ChangeLogs/ChangeLog-03a +++ b/ChangeLogs/ChangeLog-03a @@ -1,3 +1,25 @@ +Wed Sep 27 08:23:58 2000 Carlos O'Ryan <coryan@uci.edu> + + * ace/RMCast/Makefile: + Updated dependencies + + * ace/RMCast/RMCast.h: + * ace/RMCast/RMCast_Module.h: + * ace/RMCast/RMCast_Proxy.cpp: + * ace/RMCast/RMCast_Proxy.h: + * ace/RMCast/RMCast_Reassembly.h: + * ace/RMCast/RMCast_Reassembly.cpp: + * ace/RMCast/RMCast_Retransmission.cpp: + Update comments to be doxygen friendly + + * ace/RMCast/RMCast_Membership.cpp: + Fixed problems in Ack management, we were stopping the useful + Acks, not the ones that just represented repeated information. + + * tests/RMCast/Makefile: + * tests/RMCast/RMCast_Membership_Test.cpp: + Add test for the ACE_RMCast_Membership class. + Tue Sep 26 22:39:42 2000 Darrell Brunsch <brunsch@uci.edu> * examples/Reactor/WFMO_Reactor/test_abandoned.cpp: diff --git a/ace/RMCast/Makefile b/ace/RMCast/Makefile index 3241d84bca5..ae13792c4c4 100644 --- a/ace/RMCast/Makefile +++ b/ace/RMCast/Makefile @@ -74,7 +74,8 @@ include $(ACE_ROOT)/include/makeinclude/rules.local.GNU .obj/RMCast_Module.o .obj/RMCast_Module.so .shobj/RMCast_Module.o .shobj/RMCast_Module.so: RMCast_Module.cpp RMCast_Module.h \ $(ACE_ROOT)/ace/pre.h \ - RMCast.h $(ACE_ROOT)/ace/OS.h \ + RMCast.h \ + $(ACE_ROOT)/ace/OS.h \ $(ACE_ROOT)/ace/post.h \ $(ACE_ROOT)/ace/ACE_export.h \ $(ACE_ROOT)/ace/svc_export.h \ @@ -99,7 +100,8 @@ include $(ACE_ROOT)/include/makeinclude/rules.local.GNU .obj/RMCast_Module_Factory.o .obj/RMCast_Module_Factory.so .shobj/RMCast_Module_Factory.o .shobj/RMCast_Module_Factory.so: RMCast_Module_Factory.cpp \ RMCast_Module_Factory.h \ $(ACE_ROOT)/ace/pre.h \ - RMCast.h $(ACE_ROOT)/ace/OS.h \ + RMCast.h \ + $(ACE_ROOT)/ace/OS.h \ $(ACE_ROOT)/ace/post.h \ $(ACE_ROOT)/ace/ACE_export.h \ $(ACE_ROOT)/ace/svc_export.h \ diff --git a/ace/RMCast/RMCast.h b/ace/RMCast/RMCast.h index c7cdf718c04..abf2a24e946 100644 --- a/ace/RMCast/RMCast.h +++ b/ace/RMCast/RMCast.h @@ -28,69 +28,22 @@ class ACE_Message_Block; class ACE_RMCast_Proxy; +//! The RMCast namespace +/*! + Several simple data structures and enums are shared by all the + RMCast components, this is the place where we put them by default. +*/ class ACE_RMCast_Export ACE_RMCast { public: - // Message formats - - // From SENDER to RECEIVER - // - // POLL - // +---------+----------------------+ - // | 8 bits | MT_POLL | - // +---------+----------------------+ - // - // ACK_JOIN - // +---------+----------------------+ - // | 8 bits | MT_ACK_JOIN | - // +---------+----------------------+ - // | 32 bits | next_sequence_number | - // +---------+----------------------+ - // - // ACK_LEAVE - // +---------+----------------------+ - // | 8 bits | ACK_LEAVE | - // +---------+----------------------+ - // - // DATA - // +---------+----------------------+ - // | 8 bits | DATA | - // +---------+----------------------+ - // | 32 bits | sequence_number | - // +---------+----------------------+ - // | 32 bits | message_size | - // +---------+----------------------+ - // | 32 bits | fragment_offset | - // +---------+----------------------+ - // ? ? ? ? ? | 32 bits | payload_size | - // ? ? ? ? ? +---------+----------------------+ - // | | payload | - // +---------+----------------------+ - // - - // From RECEIVER to SENDER - // - // MT_JOIN - // +---------+----------------------+ - // | 8 bits | MT_JOIN | - // +---------+----------------------+ - // - // MT_LEAVE - // +---------+----------------------+ - // | 8 bits | MT_LEAVE | - // +---------+----------------------+ - // - // MT_ACK - // +---------+----------------------+ - // | 8 bits | MT_ACK | - // +---------+----------------------+ - // | 32 bits | highest_in_sequence | - // +---------+----------------------+ - // | 32 bits | highest_received | - // +---------+----------------------+ - // - + //! The message types + /*! + Each message includes a type field in the header used by the + receiver to correctly parse it. + Classes with the same name as the message type describe the actual + format of the message. + */ enum Message_Type { // Sender initiated @@ -105,6 +58,41 @@ public: MT_LAST }; + //! Simple enum used to describe the receiver state transitions + /*! + Receivers go through several states before they can fully accept + messages, the following comments describe those states, as well as + the possible transitions + This configuration is pesimistic, any invalid message is cause + enough to reclaim all the resources. This partially addresses + situations where either accidentally or intentionally a sender is + multicasting packets to the wrong group. + + <CODE> + NON_EXISTENT JOINING JOINED LEAVING<BR> + ----------------------------------------------------------------<BR> + POLL JOINING JOINING JOINED LEAVING<BR> + Send/Join Send/Join Send/Ack Send/Leave<BR> + <BR> + ACK NON_EXISTENT NON_EXISTENT NON_EXISTENT NON_EXISTENT<BR> + Noop Destroy Destroy Destroy<BR> + <BR> + JOIN NON_EXISTENT NON_EXISTENT NON_EXISTENT NON_EXISTENT<BR> + Noop Destroy Destroy Destroy<BR> + <BR> + LEAVE NON_EXISTENT NON_EXISTENT NON_EXISTENT NON_EXISTENT<BR> + Noop Destroy Destroy Destroy<BR> + <BR> + ACK_JOIN JOINING JOINED JOINED LEAVING<BR> + Send/Join Update ACT Update ACT Send/Leave<BR> + <BR> + ACK_LEAVE NON_EXISTENT NON_EXISTENT NON_EXISTENT NON_EXISTENT<BR> + Noop Destroy Destroy Destroy<BR> + <BR> + SEND_DATA JOINING JOINING JOINED LEAVING<BR> + Send/Join Send/Join Recv/Data Send/Leave<BR> + </CODE> + */ enum Receiver_State { RS_NON_EXISTENT, @@ -113,81 +101,75 @@ public: RS_LEAVING }; - // State transition (and actions) for the receivers. - // This configuration is pesimistic, any invalid message is cause - // enough to reclaim all the resources. This partially addresses - // situations where either accidentally or intentionally a sender is - // multicasting packets to the wrong group. - // - // NON_EXISTENT JOINING JOINED LEAVING - // ---------------------------------------------------------------- - // POLL JOINING JOINING JOINED LEAVING - // Send/Join Send/Join Send/Ack Send/Leave - // - // ACK NON_EXISTENT NON_EXISTENT NON_EXISTENT NON_EXISTENT - // Noop Destroy Destroy Destroy - // - // JOIN NON_EXISTENT NON_EXISTENT NON_EXISTENT NON_EXISTENT - // Noop Destroy Destroy Destroy - // - // LEAVE NON_EXISTENT NON_EXISTENT NON_EXISTENT NON_EXISTENT - // Noop Destroy Destroy Destroy - // - // ACK_JOIN JOINING JOINED JOINED LEAVING - // Send/Join Update ACT Update ACT Send/Leave - // - // ACK_LEAVE NON_EXISTENT NON_EXISTENT NON_EXISTENT NON_EXISTENT - // Noop Destroy Destroy Destroy - // - // SEND_DATA JOINING JOINING JOINED LEAVING - // Send/Join Send/Join Recv/Data Send/Leave - // + //! Simle enum used to describe the state transitions for senders + /*! + State transition (and actions) for the senders. + This configuration is pesimistic, any invalid message is cause + enough to reclaim all the resources. This partially addresses + situations where either accidentally or intentionally a sender is + multicasting packets to the wrong group. + + <CODE> + NON_EXISTENT JOINED<BR> + ------------------------------------------<BR> + POLL NON_EXISTENT NON_EXISTENT<BR> + Destroy Destroy<BR> + <BR> + ACK NON_EXISTENT JOINED<BR> + Noop Process/Ack<BR> + <BR> + JOIN JOINED NON_EXISTENT<BR> + Send/Join_Ack Send/Join_Ack<BR> + <BR> + LEAVE NON_EXISTENT NON_EXISTENT<BR> + Send/Leave_Ack Send/Leave_Ack<BR> + Destroy<BR> + <BR> + ACK_JOIN NON_EXISTENT NON_EXISTENT<BR> + Noop Destroy<BR> + <BR> + ACK_LEAVE NON_EXISTENT NON_EXISTENT<BR> + Noop Destroy<BR> + <BR> + SEND_DATA NON_EXISTENT NON_EXISTENT<BR> + Noop Destroy<BR> + </CODE> + */ enum Sender_State { SS_NON_EXISTENT, SS_JOINED }; - // State transition (and actions) for the senders. - // This configuration is pesimistic, any invalid message is cause - // enough to reclaim all the resources. This partially addresses - // situations where either accidentally or intentionally a sender is - // multicasting packets to the wrong group. - // - // NON_EXISTENT JOINED - // ------------------------------------------ - // POLL NON_EXISTENT NON_EXISTENT - // Destroy Destroy - // - // ACK NON_EXISTENT JOINED - // Noop Process/Ack - // - // JOIN JOINED NON_EXISTENT - // Send/Join_Ack Send/Join_Ack - // - // LEAVE NON_EXISTENT NON_EXISTENT - // Send/Leave_Ack Send/Leave_Ack - // Destroy - // - // ACK_JOIN NON_EXISTENT NON_EXISTENT - // Noop Destroy - // - // ACK_LEAVE NON_EXISTENT NON_EXISTENT - // Noop Destroy - // - // SEND_DATA NON_EXISTENT NON_EXISTENT - // Noop Destroy - // - // These structures define the basic layout of the messages. + + //! This is the main message sent by senders + /*! + <CODE> + +---------+----------------------+<BR> + | 8 bits | DATA |<BR> + +---------+----------------------+<BR> + | 32 bits | sequence_number |<BR> + +---------+----------------------+<BR> + | 32 bits | message_size |<BR> + +---------+----------------------+<BR> + | 32 bits | fragment_offset |<BR> + +---------+----------------------+<BR> + ? ? ? ? ? | 32 bits | payload_size |<BR> + ? ? ? ? ? +---------+----------------------+<BR> + | | payload |<BR> + +---------+----------------------+<BR> + </CODE> + */ struct Data { // Source ID is implicit in recvfrom()... ACE_UINT32 sequence_number; ACE_UINT32 total_size; ACE_UINT32 fragment_offset; + // @@ TODO: we may want to add optional fields, such as: // - Polling clients for their status // - Sending the range of messages in the queue @@ -196,41 +178,113 @@ public: ACE_Message_Block *payload; + //! Pass the proxy source between layers ACE_RMCast_Proxy *source; }; + /*! + <CODE> + +---------+----------------------+<BR> + | 8 bits | MT_POLL |<BR> + +---------+----------------------+<BR> + </CODE> + */ struct Poll { + //! Pass the proxy source between layers ACE_RMCast_Proxy *source; }; + //! Receivers accept new members using this message + /*! + <CODE> + +---------+----------------------+<BR> + | 8 bits | MT_ACK_JOIN |<BR> + +---------+----------------------+<BR> + | 32 bits | next_sequence_number |<BR> + +---------+----------------------+<BR> + </CODE> + */ struct Ack_Join { ACE_INT32 next_sequence_number; + //! Pass the proxy source between layers ACE_RMCast_Proxy *source; }; + //! Senders acknowledge when receivers try to leave + /*! + <CODE> + +---------+----------------------+<BR> + | 8 bits | ACK_LEAVE |<BR> + +---------+----------------------+<BR> + </CODE> + */ struct Ack_Leave { + //! Pass the proxy source between layers ACE_RMCast_Proxy *source; }; + //! Provide feedback to the sender about messages received and sent + //! so far. + /*! + * + * This message is used to provide feedback information to senders. + * It contains two sequence numbers: + * - highest_in_sequence: is the sequence number of the last message + * received without any lost messages before it + * - highest_received: is the sequence number of the last_message + * successfully received, there may be some messages lost before it + * + * <CODE> + * +---------+----------------------+<BR> + * | 8 bits | MT_ACK |<BR> + * +---------+----------------------+<BR> + * | 32 bits | highest_in_sequence |<BR> + * +---------+----------------------+<BR> + * | 32 bits | highest_received |<BR> + * +---------+----------------------+<BR> + * </CODE> + */ struct Ack { + //! The last message received without any losses before it. ACE_UINT32 highest_in_sequence; + + //! The last message successfully received ACE_UINT32 highest_received; + //! Pass the proxy source between layers ACE_RMCast_Proxy *source; }; + //! Receivers send this message to indicate they want to join + /* + <CODE> + +---------+----------------------+<BR> + | 8 bits | MT_JOIN |<BR> + +---------+----------------------+<BR> + </CODE> + */ struct Join { + //! Pass the proxy source between layers ACE_RMCast_Proxy *source; }; + //! Receivers send this message to disconnect gracefully + /*! + <CODE> + +---------+----------------------+<BR> + | 8 bits | MT_LEAVE |<BR> + +---------+----------------------+<BR> + </CODE> + */ struct Leave { + //! Pass the proxy source between layers ACE_RMCast_Proxy *source; }; }; diff --git a/ace/RMCast/RMCast_Membership.cpp b/ace/RMCast/RMCast_Membership.cpp index 8ab58d84475..6ee2690a41f 100644 --- a/ace/RMCast/RMCast_Membership.cpp +++ b/ace/RMCast/RMCast_Membership.cpp @@ -18,11 +18,13 @@ ACE_RMCast_Membership::~ACE_RMCast_Membership (void) int ACE_RMCast_Membership::ack (ACE_RMCast::Ack &ack) { + // ACE_DEBUG ((LM_DEBUG, "ACE_RMCast_Membership::ack\n")); Proxy_Iterator end = this->proxies_.end (); Proxy_Iterator i = this->proxies_.begin (); if (i == end) return 0; + // ACE_DEBUG ((LM_DEBUG, "ACE_RMCast_Membership::ack[2]\n")); ACE_RMCast::Ack next_ack; { ACE_GUARD_RETURN (ACE_SYNCH_MUTEX, ace_mon, this->mutex_, -1); @@ -30,11 +32,13 @@ ACE_RMCast_Membership::ack (ACE_RMCast::Ack &ack) { // @@ This violates an invariant of the class, shouldn't // happen... + // ACE_DEBUG ((LM_DEBUG, "ACE_RMCast_Membership::ack[3]\n")); return -1; } else if (ack.highest_in_sequence == this->highest_in_sequence_) { // Nothing new, just continue.... + // ACE_DEBUG ((LM_DEBUG, "ACE_RMCast_Membership::ack[4]\n")); return 0; } // Possible update, re-evaluate the story... @@ -52,12 +56,15 @@ ACE_RMCast_Membership::ack (ACE_RMCast::Ack &ack) if (r > highest_received) highest_received = r; } +#if 0 if (this->highest_in_sequence_ >= highest_in_sequence - || this->highest_received_ < highest_received) + || this->highest_received_ >= highest_received) { // No change.... + // ACE_DEBUG ((LM_DEBUG, "ACE_RMCast_Membership::ack[5]\n")); return 0; } +#endif /* 0 */ this->highest_in_sequence_ = highest_in_sequence; this->highest_received_ = highest_received; if (this->next () == 0) @@ -84,10 +91,7 @@ ACE_RMCast_Membership::join (ACE_RMCast::Join &join) return -1; } - if (this->next () == 0) - return 0; - - return this->next ()->join (join); + return this->ACE_RMCast_Module::join (join); } int @@ -98,18 +102,16 @@ ACE_RMCast_Membership::leave (ACE_RMCast::Leave &leave) { ACE_GUARD_RETURN (ACE_SYNCH_MUTEX, ace_mon, this->mutex_, -1); - if (this->proxies_.remove (leave.source) == -1) - return 0; + (void) this->proxies_.remove (leave.source); } - if (this->next () == 0) - return 0; - - return this->next ()->leave (leave); + return this->ACE_RMCast_Module::leave (leave); } #if defined (ACE_HAS_EXPLICIT_TEMPLATE_INSTANTIATION) template class ACE_Unbounded_Set<ACE_RMCast_Proxy*>; +template class ACE_Unbounded_Set_Iterator<ACE_RMCast_Proxy*>; +template class ACE_Node<ACE_RMCast_Proxy*>; #endif /* ACE_HAS_EXPLICIT_TEMPLATE_INSTANTIATION */ diff --git a/ace/RMCast/RMCast_Module.h b/ace/RMCast/RMCast_Module.h index 9f83c2d5be4..dc4077fa4ab 100644 --- a/ace/RMCast/RMCast_Module.h +++ b/ace/RMCast/RMCast_Module.h @@ -27,51 +27,66 @@ class ACE_Message_Block; class ACE_Time_Value; +//! Reliable Multicast Module +/*! + The reliable multicast protocol is implemented as a stack of + "Modules" each one performing one specific task. In short, this is + an instance of the pipes-and-filters architectural pattern. +*/ class ACE_RMCast_Export ACE_RMCast_Module { - // = TITLE - // Reliable Multicast Module - // - // = DESCRIPTION - // The reliable multicast protocol is implemented as a stack of - // "Modules" each one performing one specific task. - // In short, this is an instance of the pipes-and-filters - // architectural pattern. - // public: // = Initialization and termination methods. ACE_RMCast_Module (void); - // Constructor + //!< Constructor virtual ~ACE_RMCast_Module (void); - // Destructor + //!< Destructor virtual int next (ACE_RMCast_Module *next); + //!< Modifier for the next element in the stack + virtual ACE_RMCast_Module* next (void) const; + //!< Accesor for the next element in the stack + virtual int prev (ACE_RMCast_Module *prev); + //!< Modifier for the previous element in the stack + virtual ACE_RMCast_Module* prev (void) const; - // Modifiers and accessors for the previous and next module in the - // stack + //!< Accesor for the previous element in the stack virtual int open (void); - // Initialize the module, setting up the next module + //!< Initialize the module, setting up the next module virtual int close (void); - // Close the module. + //!< Close the module. virtual int data (ACE_RMCast::Data &); + //!< Push data through the stack + virtual int poll (ACE_RMCast::Poll &); + //!< Push a polling request through the stack + virtual int ack_join (ACE_RMCast::Ack_Join &); + //!< Push a message to ack a join request through the stack + virtual int ack_leave (ACE_RMCast::Ack_Leave &); + //!< Push a message to ack a leave request through the stack + virtual int ack (ACE_RMCast::Ack &); + //!< Push an ack mesage through the stack + virtual int join (ACE_RMCast::Join &); + //!< Push a join message through the stack + virtual int leave (ACE_RMCast::Leave &); - // Push data down the stack + //!< Push a leave message through the stack private: + //! The next element in the stack ACE_RMCast_Module *next_; + //! The previous element in the stack ACE_RMCast_Module *prev_; - // The next and previous modules in the stack }; #if defined (__ACE_INLINE__) diff --git a/ace/RMCast/RMCast_Proxy.cpp b/ace/RMCast/RMCast_Proxy.cpp index 2a8b2ba40b4..53d9d0b6726 100644 --- a/ace/RMCast/RMCast_Proxy.cpp +++ b/ace/RMCast/RMCast_Proxy.cpp @@ -25,3 +25,11 @@ ACE_RMCast_Proxy::highest_received (void) const { return this->highest_received_; } + +int +ACE_RMCast_Proxy::ack (ACE_RMCast::Ack &ack) +{ + this->highest_in_sequence_ = ack.highest_in_sequence; + this->highest_received_ = ack.highest_received; + return this->ACE_RMCast_Module::ack (ack); +} diff --git a/ace/RMCast/RMCast_Proxy.h b/ace/RMCast/RMCast_Proxy.h index dca6494e374..414b74174fb 100644 --- a/ace/RMCast/RMCast_Proxy.h +++ b/ace/RMCast/RMCast_Proxy.h @@ -27,28 +27,48 @@ class ACE_Message_Block; class ACE_Time_Value; +//! Local representation for remote peers +/*! + Both senders and receivers in the multicast group need to maintain + explicit representations of their "peers". For example, a sender + needs to know the list of all the receivers and what messages they + have reported as successfully received. + Likewise, the receiver needs to maintain separate state for each + remote sender, and must be able to disconnect from all of them + gracefully when needed. + The RMCast_Proxy class is an opaque representation of such a peer, + and hides all the networking details from the rest of the system. +*/ class ACE_RMCast_Export ACE_RMCast_Proxy : public ACE_RMCast_Module { - // = TITLE - // Reliable Multicast Proxy - // - // = DESCRIPTION - // The proxy is used to send back messages to either a single - // receiver (o supplier). - // public: - // = Initialization and termination methods. + //! Constructor ACE_RMCast_Proxy (void); // Constructor + //! Destructor virtual ~ACE_RMCast_Proxy (void); - // Destructor + //! Return the highest sequence number received without any losses + //! before it. Only applies to remote receiver proxies. + /*! + Please read the documentation in ACE_RMCast::Ack + */ virtual ACE_UINT32 highest_in_sequence (void) const; + + //! Return the highest sequence number successfully received. + //! Only applies to remote receiver proxies. + /*! + Please read the documentation in ACE_RMCast::Ack + */ virtual ACE_UINT32 highest_received (void) const; - // Get the sequence numbers received by the remote proxy. - // Return 0 for sender proxies + //@{ + //! Send messages directly to the peer. + /*! Send a message directly to the peer, i.e. the message is not + sent through the multicast group and it may not be processed by + all the layers in the stack. + */ virtual int reply_data (ACE_RMCast::Data &) = 0; virtual int reply_poll (ACE_RMCast::Poll &) = 0; virtual int reply_ack_join (ACE_RMCast::Ack_Join &) = 0; @@ -56,16 +76,21 @@ public: virtual int reply_ack (ACE_RMCast::Ack &) = 0; virtual int reply_join (ACE_RMCast::Join &) = 0; virtual int reply_leave (ACE_RMCast::Leave &) = 0; - // Push data back to the remote proxy + //@} - // = The RMCast_Module methods + /*! + Proxies process the ACK sequence numbers to save the sequence + numbers reported from the remote peer. + */ virtual int ack (ACE_RMCast::Ack &); private: + //@{ + //! Cache the sequence numbers reported from the remote peer using + //! Ack messages ACE_UINT32 highest_in_sequence_; ACE_UINT32 highest_received_; - // Cache the sequence numbers reported from the remote peer using - // Ack messages + //@} }; #if defined (__ACE_INLINE__) diff --git a/ace/RMCast/RMCast_Reassembly.cpp b/ace/RMCast/RMCast_Reassembly.cpp index ba2e9b79c1a..ed488341bae 100644 --- a/ace/RMCast/RMCast_Reassembly.cpp +++ b/ace/RMCast/RMCast_Reassembly.cpp @@ -22,6 +22,10 @@ ACE_RMCast_Reassembly (void) ACE_RMCast_Reassembly::~ACE_RMCast_Reassembly (void) { + /*!< + We cleanup the resources in the destructor + <B color=red>@@ TODO</B> Why not in the close() operation? + */ for (Message_Map_Iterator i = this->messages_.begin (); i != this->messages_.end (); ++i) @@ -87,6 +91,7 @@ ACE_RMCast_Reassembly::data (ACE_RMCast::Data &data) // Push the message... ACE_RMCast::Data downstream_data; + downstream_data.source = data.source; downstream_data.sequence_number = data.sequence_number; downstream_data.total_size = message->message_body ()->length (); downstream_data.fragment_offset = 0; diff --git a/ace/RMCast/RMCast_Reassembly.h b/ace/RMCast/RMCast_Reassembly.h index 0bf0c3a61ee..6dc37e1ae19 100644 --- a/ace/RMCast/RMCast_Reassembly.h +++ b/ace/RMCast/RMCast_Reassembly.h @@ -24,19 +24,28 @@ class ACE_RMCast_Partial_Message; +//! Reassemble multiple data fragments into a single data message +/*! + Data messages may not fit in a single MTU in the transport layer, in + that case the application configure a RMCast_Fragment module on the + sender side. On the receiver side this layer reassemble the + messages sent from a <EM>single</EM> source, and passes the messages + up the stream. +*/ class ACE_RMCast_Export ACE_RMCast_Reassembly : public ACE_RMCast_Module { public: + //! Constructor ACE_RMCast_Reassembly (void); - // Constructor + //! Destructor virtual ~ACE_RMCast_Reassembly (void); - // Destructor // = The ACE_RMCast_Module methods virtual int data (ACE_RMCast::Data &data); private: + //! A mutex used to synchronize all the internal operations. ACE_SYNCH_MUTEX mutex_; typedef ACE_Hash_Map_Manager<ACE_UINT32,ACE_RMCast_Partial_Message*,ACE_Null_Mutex> @@ -45,8 +54,9 @@ private: ACE_Hash_Map_Iterator<ACE_UINT32,ACE_RMCast_Partial_Message*,ACE_Null_Mutex> Message_Map_Iterator; + //! A map, indexed by sequence number, of the partially received + //! messages. Message_Map messages_; - // The array of partially received messages }; #if defined (__ACE_INLINE__) diff --git a/ace/RMCast/RMCast_Retransmission.cpp b/ace/RMCast/RMCast_Retransmission.cpp index b90cf0ddf31..a996e1204d5 100644 --- a/ace/RMCast/RMCast_Retransmission.cpp +++ b/ace/RMCast/RMCast_Retransmission.cpp @@ -96,6 +96,9 @@ ACE_RMCast_Retransmission::ack (ACE_RMCast::Ack &ack) #if defined (ACE_HAS_EXPLICIT_TEMPLATE_INSTANTIATION) template class ACE_RB_Tree<ACE_UINT32,ACE_RMCast::Data,ACE_Less_Than<ACE_UINT32>,ACE_Null_Mutex>; +template class ACE_RB_Tree_Iterator_Base<ACE_UINT32,ACE_RMCast::Data,ACE_Less_Than<ACE_UINT32>,ACE_Null_Mutex>; template class ACE_RB_Tree_Iterator<ACE_UINT32,ACE_RMCast::Data,ACE_Less_Than<ACE_UINT32>,ACE_Null_Mutex>; +template class ACE_RB_Tree_Reverse_Iterator<ACE_UINT32,ACE_RMCast::Data,ACE_Less_Than<ACE_UINT32>,ACE_Null_Mutex>; +template class ACE_RB_Tree_Node<ACE_UINT32,ACE_RMCast::Data>; #endif /* ACE_HAS_EXPLICIT_TEMPLATE_INSTANTIATION */ diff --git a/protocols/ace/RMCast/Makefile b/protocols/ace/RMCast/Makefile index 3241d84bca5..ae13792c4c4 100644 --- a/protocols/ace/RMCast/Makefile +++ b/protocols/ace/RMCast/Makefile @@ -74,7 +74,8 @@ include $(ACE_ROOT)/include/makeinclude/rules.local.GNU .obj/RMCast_Module.o .obj/RMCast_Module.so .shobj/RMCast_Module.o .shobj/RMCast_Module.so: RMCast_Module.cpp RMCast_Module.h \ $(ACE_ROOT)/ace/pre.h \ - RMCast.h $(ACE_ROOT)/ace/OS.h \ + RMCast.h \ + $(ACE_ROOT)/ace/OS.h \ $(ACE_ROOT)/ace/post.h \ $(ACE_ROOT)/ace/ACE_export.h \ $(ACE_ROOT)/ace/svc_export.h \ @@ -99,7 +100,8 @@ include $(ACE_ROOT)/include/makeinclude/rules.local.GNU .obj/RMCast_Module_Factory.o .obj/RMCast_Module_Factory.so .shobj/RMCast_Module_Factory.o .shobj/RMCast_Module_Factory.so: RMCast_Module_Factory.cpp \ RMCast_Module_Factory.h \ $(ACE_ROOT)/ace/pre.h \ - RMCast.h $(ACE_ROOT)/ace/OS.h \ + RMCast.h \ + $(ACE_ROOT)/ace/OS.h \ $(ACE_ROOT)/ace/post.h \ $(ACE_ROOT)/ace/ACE_export.h \ $(ACE_ROOT)/ace/svc_export.h \ diff --git a/protocols/ace/RMCast/RMCast.h b/protocols/ace/RMCast/RMCast.h index c7cdf718c04..abf2a24e946 100644 --- a/protocols/ace/RMCast/RMCast.h +++ b/protocols/ace/RMCast/RMCast.h @@ -28,69 +28,22 @@ class ACE_Message_Block; class ACE_RMCast_Proxy; +//! The RMCast namespace +/*! + Several simple data structures and enums are shared by all the + RMCast components, this is the place where we put them by default. +*/ class ACE_RMCast_Export ACE_RMCast { public: - // Message formats - - // From SENDER to RECEIVER - // - // POLL - // +---------+----------------------+ - // | 8 bits | MT_POLL | - // +---------+----------------------+ - // - // ACK_JOIN - // +---------+----------------------+ - // | 8 bits | MT_ACK_JOIN | - // +---------+----------------------+ - // | 32 bits | next_sequence_number | - // +---------+----------------------+ - // - // ACK_LEAVE - // +---------+----------------------+ - // | 8 bits | ACK_LEAVE | - // +---------+----------------------+ - // - // DATA - // +---------+----------------------+ - // | 8 bits | DATA | - // +---------+----------------------+ - // | 32 bits | sequence_number | - // +---------+----------------------+ - // | 32 bits | message_size | - // +---------+----------------------+ - // | 32 bits | fragment_offset | - // +---------+----------------------+ - // ? ? ? ? ? | 32 bits | payload_size | - // ? ? ? ? ? +---------+----------------------+ - // | | payload | - // +---------+----------------------+ - // - - // From RECEIVER to SENDER - // - // MT_JOIN - // +---------+----------------------+ - // | 8 bits | MT_JOIN | - // +---------+----------------------+ - // - // MT_LEAVE - // +---------+----------------------+ - // | 8 bits | MT_LEAVE | - // +---------+----------------------+ - // - // MT_ACK - // +---------+----------------------+ - // | 8 bits | MT_ACK | - // +---------+----------------------+ - // | 32 bits | highest_in_sequence | - // +---------+----------------------+ - // | 32 bits | highest_received | - // +---------+----------------------+ - // - + //! The message types + /*! + Each message includes a type field in the header used by the + receiver to correctly parse it. + Classes with the same name as the message type describe the actual + format of the message. + */ enum Message_Type { // Sender initiated @@ -105,6 +58,41 @@ public: MT_LAST }; + //! Simple enum used to describe the receiver state transitions + /*! + Receivers go through several states before they can fully accept + messages, the following comments describe those states, as well as + the possible transitions + This configuration is pesimistic, any invalid message is cause + enough to reclaim all the resources. This partially addresses + situations where either accidentally or intentionally a sender is + multicasting packets to the wrong group. + + <CODE> + NON_EXISTENT JOINING JOINED LEAVING<BR> + ----------------------------------------------------------------<BR> + POLL JOINING JOINING JOINED LEAVING<BR> + Send/Join Send/Join Send/Ack Send/Leave<BR> + <BR> + ACK NON_EXISTENT NON_EXISTENT NON_EXISTENT NON_EXISTENT<BR> + Noop Destroy Destroy Destroy<BR> + <BR> + JOIN NON_EXISTENT NON_EXISTENT NON_EXISTENT NON_EXISTENT<BR> + Noop Destroy Destroy Destroy<BR> + <BR> + LEAVE NON_EXISTENT NON_EXISTENT NON_EXISTENT NON_EXISTENT<BR> + Noop Destroy Destroy Destroy<BR> + <BR> + ACK_JOIN JOINING JOINED JOINED LEAVING<BR> + Send/Join Update ACT Update ACT Send/Leave<BR> + <BR> + ACK_LEAVE NON_EXISTENT NON_EXISTENT NON_EXISTENT NON_EXISTENT<BR> + Noop Destroy Destroy Destroy<BR> + <BR> + SEND_DATA JOINING JOINING JOINED LEAVING<BR> + Send/Join Send/Join Recv/Data Send/Leave<BR> + </CODE> + */ enum Receiver_State { RS_NON_EXISTENT, @@ -113,81 +101,75 @@ public: RS_LEAVING }; - // State transition (and actions) for the receivers. - // This configuration is pesimistic, any invalid message is cause - // enough to reclaim all the resources. This partially addresses - // situations where either accidentally or intentionally a sender is - // multicasting packets to the wrong group. - // - // NON_EXISTENT JOINING JOINED LEAVING - // ---------------------------------------------------------------- - // POLL JOINING JOINING JOINED LEAVING - // Send/Join Send/Join Send/Ack Send/Leave - // - // ACK NON_EXISTENT NON_EXISTENT NON_EXISTENT NON_EXISTENT - // Noop Destroy Destroy Destroy - // - // JOIN NON_EXISTENT NON_EXISTENT NON_EXISTENT NON_EXISTENT - // Noop Destroy Destroy Destroy - // - // LEAVE NON_EXISTENT NON_EXISTENT NON_EXISTENT NON_EXISTENT - // Noop Destroy Destroy Destroy - // - // ACK_JOIN JOINING JOINED JOINED LEAVING - // Send/Join Update ACT Update ACT Send/Leave - // - // ACK_LEAVE NON_EXISTENT NON_EXISTENT NON_EXISTENT NON_EXISTENT - // Noop Destroy Destroy Destroy - // - // SEND_DATA JOINING JOINING JOINED LEAVING - // Send/Join Send/Join Recv/Data Send/Leave - // + //! Simle enum used to describe the state transitions for senders + /*! + State transition (and actions) for the senders. + This configuration is pesimistic, any invalid message is cause + enough to reclaim all the resources. This partially addresses + situations where either accidentally or intentionally a sender is + multicasting packets to the wrong group. + + <CODE> + NON_EXISTENT JOINED<BR> + ------------------------------------------<BR> + POLL NON_EXISTENT NON_EXISTENT<BR> + Destroy Destroy<BR> + <BR> + ACK NON_EXISTENT JOINED<BR> + Noop Process/Ack<BR> + <BR> + JOIN JOINED NON_EXISTENT<BR> + Send/Join_Ack Send/Join_Ack<BR> + <BR> + LEAVE NON_EXISTENT NON_EXISTENT<BR> + Send/Leave_Ack Send/Leave_Ack<BR> + Destroy<BR> + <BR> + ACK_JOIN NON_EXISTENT NON_EXISTENT<BR> + Noop Destroy<BR> + <BR> + ACK_LEAVE NON_EXISTENT NON_EXISTENT<BR> + Noop Destroy<BR> + <BR> + SEND_DATA NON_EXISTENT NON_EXISTENT<BR> + Noop Destroy<BR> + </CODE> + */ enum Sender_State { SS_NON_EXISTENT, SS_JOINED }; - // State transition (and actions) for the senders. - // This configuration is pesimistic, any invalid message is cause - // enough to reclaim all the resources. This partially addresses - // situations where either accidentally or intentionally a sender is - // multicasting packets to the wrong group. - // - // NON_EXISTENT JOINED - // ------------------------------------------ - // POLL NON_EXISTENT NON_EXISTENT - // Destroy Destroy - // - // ACK NON_EXISTENT JOINED - // Noop Process/Ack - // - // JOIN JOINED NON_EXISTENT - // Send/Join_Ack Send/Join_Ack - // - // LEAVE NON_EXISTENT NON_EXISTENT - // Send/Leave_Ack Send/Leave_Ack - // Destroy - // - // ACK_JOIN NON_EXISTENT NON_EXISTENT - // Noop Destroy - // - // ACK_LEAVE NON_EXISTENT NON_EXISTENT - // Noop Destroy - // - // SEND_DATA NON_EXISTENT NON_EXISTENT - // Noop Destroy - // - // These structures define the basic layout of the messages. + + //! This is the main message sent by senders + /*! + <CODE> + +---------+----------------------+<BR> + | 8 bits | DATA |<BR> + +---------+----------------------+<BR> + | 32 bits | sequence_number |<BR> + +---------+----------------------+<BR> + | 32 bits | message_size |<BR> + +---------+----------------------+<BR> + | 32 bits | fragment_offset |<BR> + +---------+----------------------+<BR> + ? ? ? ? ? | 32 bits | payload_size |<BR> + ? ? ? ? ? +---------+----------------------+<BR> + | | payload |<BR> + +---------+----------------------+<BR> + </CODE> + */ struct Data { // Source ID is implicit in recvfrom()... ACE_UINT32 sequence_number; ACE_UINT32 total_size; ACE_UINT32 fragment_offset; + // @@ TODO: we may want to add optional fields, such as: // - Polling clients for their status // - Sending the range of messages in the queue @@ -196,41 +178,113 @@ public: ACE_Message_Block *payload; + //! Pass the proxy source between layers ACE_RMCast_Proxy *source; }; + /*! + <CODE> + +---------+----------------------+<BR> + | 8 bits | MT_POLL |<BR> + +---------+----------------------+<BR> + </CODE> + */ struct Poll { + //! Pass the proxy source between layers ACE_RMCast_Proxy *source; }; + //! Receivers accept new members using this message + /*! + <CODE> + +---------+----------------------+<BR> + | 8 bits | MT_ACK_JOIN |<BR> + +---------+----------------------+<BR> + | 32 bits | next_sequence_number |<BR> + +---------+----------------------+<BR> + </CODE> + */ struct Ack_Join { ACE_INT32 next_sequence_number; + //! Pass the proxy source between layers ACE_RMCast_Proxy *source; }; + //! Senders acknowledge when receivers try to leave + /*! + <CODE> + +---------+----------------------+<BR> + | 8 bits | ACK_LEAVE |<BR> + +---------+----------------------+<BR> + </CODE> + */ struct Ack_Leave { + //! Pass the proxy source between layers ACE_RMCast_Proxy *source; }; + //! Provide feedback to the sender about messages received and sent + //! so far. + /*! + * + * This message is used to provide feedback information to senders. + * It contains two sequence numbers: + * - highest_in_sequence: is the sequence number of the last message + * received without any lost messages before it + * - highest_received: is the sequence number of the last_message + * successfully received, there may be some messages lost before it + * + * <CODE> + * +---------+----------------------+<BR> + * | 8 bits | MT_ACK |<BR> + * +---------+----------------------+<BR> + * | 32 bits | highest_in_sequence |<BR> + * +---------+----------------------+<BR> + * | 32 bits | highest_received |<BR> + * +---------+----------------------+<BR> + * </CODE> + */ struct Ack { + //! The last message received without any losses before it. ACE_UINT32 highest_in_sequence; + + //! The last message successfully received ACE_UINT32 highest_received; + //! Pass the proxy source between layers ACE_RMCast_Proxy *source; }; + //! Receivers send this message to indicate they want to join + /* + <CODE> + +---------+----------------------+<BR> + | 8 bits | MT_JOIN |<BR> + +---------+----------------------+<BR> + </CODE> + */ struct Join { + //! Pass the proxy source between layers ACE_RMCast_Proxy *source; }; + //! Receivers send this message to disconnect gracefully + /*! + <CODE> + +---------+----------------------+<BR> + | 8 bits | MT_LEAVE |<BR> + +---------+----------------------+<BR> + </CODE> + */ struct Leave { + //! Pass the proxy source between layers ACE_RMCast_Proxy *source; }; }; diff --git a/protocols/ace/RMCast/RMCast_Membership.cpp b/protocols/ace/RMCast/RMCast_Membership.cpp index 8ab58d84475..6ee2690a41f 100644 --- a/protocols/ace/RMCast/RMCast_Membership.cpp +++ b/protocols/ace/RMCast/RMCast_Membership.cpp @@ -18,11 +18,13 @@ ACE_RMCast_Membership::~ACE_RMCast_Membership (void) int ACE_RMCast_Membership::ack (ACE_RMCast::Ack &ack) { + // ACE_DEBUG ((LM_DEBUG, "ACE_RMCast_Membership::ack\n")); Proxy_Iterator end = this->proxies_.end (); Proxy_Iterator i = this->proxies_.begin (); if (i == end) return 0; + // ACE_DEBUG ((LM_DEBUG, "ACE_RMCast_Membership::ack[2]\n")); ACE_RMCast::Ack next_ack; { ACE_GUARD_RETURN (ACE_SYNCH_MUTEX, ace_mon, this->mutex_, -1); @@ -30,11 +32,13 @@ ACE_RMCast_Membership::ack (ACE_RMCast::Ack &ack) { // @@ This violates an invariant of the class, shouldn't // happen... + // ACE_DEBUG ((LM_DEBUG, "ACE_RMCast_Membership::ack[3]\n")); return -1; } else if (ack.highest_in_sequence == this->highest_in_sequence_) { // Nothing new, just continue.... + // ACE_DEBUG ((LM_DEBUG, "ACE_RMCast_Membership::ack[4]\n")); return 0; } // Possible update, re-evaluate the story... @@ -52,12 +56,15 @@ ACE_RMCast_Membership::ack (ACE_RMCast::Ack &ack) if (r > highest_received) highest_received = r; } +#if 0 if (this->highest_in_sequence_ >= highest_in_sequence - || this->highest_received_ < highest_received) + || this->highest_received_ >= highest_received) { // No change.... + // ACE_DEBUG ((LM_DEBUG, "ACE_RMCast_Membership::ack[5]\n")); return 0; } +#endif /* 0 */ this->highest_in_sequence_ = highest_in_sequence; this->highest_received_ = highest_received; if (this->next () == 0) @@ -84,10 +91,7 @@ ACE_RMCast_Membership::join (ACE_RMCast::Join &join) return -1; } - if (this->next () == 0) - return 0; - - return this->next ()->join (join); + return this->ACE_RMCast_Module::join (join); } int @@ -98,18 +102,16 @@ ACE_RMCast_Membership::leave (ACE_RMCast::Leave &leave) { ACE_GUARD_RETURN (ACE_SYNCH_MUTEX, ace_mon, this->mutex_, -1); - if (this->proxies_.remove (leave.source) == -1) - return 0; + (void) this->proxies_.remove (leave.source); } - if (this->next () == 0) - return 0; - - return this->next ()->leave (leave); + return this->ACE_RMCast_Module::leave (leave); } #if defined (ACE_HAS_EXPLICIT_TEMPLATE_INSTANTIATION) template class ACE_Unbounded_Set<ACE_RMCast_Proxy*>; +template class ACE_Unbounded_Set_Iterator<ACE_RMCast_Proxy*>; +template class ACE_Node<ACE_RMCast_Proxy*>; #endif /* ACE_HAS_EXPLICIT_TEMPLATE_INSTANTIATION */ diff --git a/protocols/ace/RMCast/RMCast_Module.h b/protocols/ace/RMCast/RMCast_Module.h index 9f83c2d5be4..dc4077fa4ab 100644 --- a/protocols/ace/RMCast/RMCast_Module.h +++ b/protocols/ace/RMCast/RMCast_Module.h @@ -27,51 +27,66 @@ class ACE_Message_Block; class ACE_Time_Value; +//! Reliable Multicast Module +/*! + The reliable multicast protocol is implemented as a stack of + "Modules" each one performing one specific task. In short, this is + an instance of the pipes-and-filters architectural pattern. +*/ class ACE_RMCast_Export ACE_RMCast_Module { - // = TITLE - // Reliable Multicast Module - // - // = DESCRIPTION - // The reliable multicast protocol is implemented as a stack of - // "Modules" each one performing one specific task. - // In short, this is an instance of the pipes-and-filters - // architectural pattern. - // public: // = Initialization and termination methods. ACE_RMCast_Module (void); - // Constructor + //!< Constructor virtual ~ACE_RMCast_Module (void); - // Destructor + //!< Destructor virtual int next (ACE_RMCast_Module *next); + //!< Modifier for the next element in the stack + virtual ACE_RMCast_Module* next (void) const; + //!< Accesor for the next element in the stack + virtual int prev (ACE_RMCast_Module *prev); + //!< Modifier for the previous element in the stack + virtual ACE_RMCast_Module* prev (void) const; - // Modifiers and accessors for the previous and next module in the - // stack + //!< Accesor for the previous element in the stack virtual int open (void); - // Initialize the module, setting up the next module + //!< Initialize the module, setting up the next module virtual int close (void); - // Close the module. + //!< Close the module. virtual int data (ACE_RMCast::Data &); + //!< Push data through the stack + virtual int poll (ACE_RMCast::Poll &); + //!< Push a polling request through the stack + virtual int ack_join (ACE_RMCast::Ack_Join &); + //!< Push a message to ack a join request through the stack + virtual int ack_leave (ACE_RMCast::Ack_Leave &); + //!< Push a message to ack a leave request through the stack + virtual int ack (ACE_RMCast::Ack &); + //!< Push an ack mesage through the stack + virtual int join (ACE_RMCast::Join &); + //!< Push a join message through the stack + virtual int leave (ACE_RMCast::Leave &); - // Push data down the stack + //!< Push a leave message through the stack private: + //! The next element in the stack ACE_RMCast_Module *next_; + //! The previous element in the stack ACE_RMCast_Module *prev_; - // The next and previous modules in the stack }; #if defined (__ACE_INLINE__) diff --git a/protocols/ace/RMCast/RMCast_Proxy.cpp b/protocols/ace/RMCast/RMCast_Proxy.cpp index 2a8b2ba40b4..53d9d0b6726 100644 --- a/protocols/ace/RMCast/RMCast_Proxy.cpp +++ b/protocols/ace/RMCast/RMCast_Proxy.cpp @@ -25,3 +25,11 @@ ACE_RMCast_Proxy::highest_received (void) const { return this->highest_received_; } + +int +ACE_RMCast_Proxy::ack (ACE_RMCast::Ack &ack) +{ + this->highest_in_sequence_ = ack.highest_in_sequence; + this->highest_received_ = ack.highest_received; + return this->ACE_RMCast_Module::ack (ack); +} diff --git a/protocols/ace/RMCast/RMCast_Proxy.h b/protocols/ace/RMCast/RMCast_Proxy.h index dca6494e374..414b74174fb 100644 --- a/protocols/ace/RMCast/RMCast_Proxy.h +++ b/protocols/ace/RMCast/RMCast_Proxy.h @@ -27,28 +27,48 @@ class ACE_Message_Block; class ACE_Time_Value; +//! Local representation for remote peers +/*! + Both senders and receivers in the multicast group need to maintain + explicit representations of their "peers". For example, a sender + needs to know the list of all the receivers and what messages they + have reported as successfully received. + Likewise, the receiver needs to maintain separate state for each + remote sender, and must be able to disconnect from all of them + gracefully when needed. + The RMCast_Proxy class is an opaque representation of such a peer, + and hides all the networking details from the rest of the system. +*/ class ACE_RMCast_Export ACE_RMCast_Proxy : public ACE_RMCast_Module { - // = TITLE - // Reliable Multicast Proxy - // - // = DESCRIPTION - // The proxy is used to send back messages to either a single - // receiver (o supplier). - // public: - // = Initialization and termination methods. + //! Constructor ACE_RMCast_Proxy (void); // Constructor + //! Destructor virtual ~ACE_RMCast_Proxy (void); - // Destructor + //! Return the highest sequence number received without any losses + //! before it. Only applies to remote receiver proxies. + /*! + Please read the documentation in ACE_RMCast::Ack + */ virtual ACE_UINT32 highest_in_sequence (void) const; + + //! Return the highest sequence number successfully received. + //! Only applies to remote receiver proxies. + /*! + Please read the documentation in ACE_RMCast::Ack + */ virtual ACE_UINT32 highest_received (void) const; - // Get the sequence numbers received by the remote proxy. - // Return 0 for sender proxies + //@{ + //! Send messages directly to the peer. + /*! Send a message directly to the peer, i.e. the message is not + sent through the multicast group and it may not be processed by + all the layers in the stack. + */ virtual int reply_data (ACE_RMCast::Data &) = 0; virtual int reply_poll (ACE_RMCast::Poll &) = 0; virtual int reply_ack_join (ACE_RMCast::Ack_Join &) = 0; @@ -56,16 +76,21 @@ public: virtual int reply_ack (ACE_RMCast::Ack &) = 0; virtual int reply_join (ACE_RMCast::Join &) = 0; virtual int reply_leave (ACE_RMCast::Leave &) = 0; - // Push data back to the remote proxy + //@} - // = The RMCast_Module methods + /*! + Proxies process the ACK sequence numbers to save the sequence + numbers reported from the remote peer. + */ virtual int ack (ACE_RMCast::Ack &); private: + //@{ + //! Cache the sequence numbers reported from the remote peer using + //! Ack messages ACE_UINT32 highest_in_sequence_; ACE_UINT32 highest_received_; - // Cache the sequence numbers reported from the remote peer using - // Ack messages + //@} }; #if defined (__ACE_INLINE__) diff --git a/protocols/ace/RMCast/RMCast_Reassembly.cpp b/protocols/ace/RMCast/RMCast_Reassembly.cpp index ba2e9b79c1a..ed488341bae 100644 --- a/protocols/ace/RMCast/RMCast_Reassembly.cpp +++ b/protocols/ace/RMCast/RMCast_Reassembly.cpp @@ -22,6 +22,10 @@ ACE_RMCast_Reassembly (void) ACE_RMCast_Reassembly::~ACE_RMCast_Reassembly (void) { + /*!< + We cleanup the resources in the destructor + <B color=red>@@ TODO</B> Why not in the close() operation? + */ for (Message_Map_Iterator i = this->messages_.begin (); i != this->messages_.end (); ++i) @@ -87,6 +91,7 @@ ACE_RMCast_Reassembly::data (ACE_RMCast::Data &data) // Push the message... ACE_RMCast::Data downstream_data; + downstream_data.source = data.source; downstream_data.sequence_number = data.sequence_number; downstream_data.total_size = message->message_body ()->length (); downstream_data.fragment_offset = 0; diff --git a/protocols/ace/RMCast/RMCast_Reassembly.h b/protocols/ace/RMCast/RMCast_Reassembly.h index 0bf0c3a61ee..6dc37e1ae19 100644 --- a/protocols/ace/RMCast/RMCast_Reassembly.h +++ b/protocols/ace/RMCast/RMCast_Reassembly.h @@ -24,19 +24,28 @@ class ACE_RMCast_Partial_Message; +//! Reassemble multiple data fragments into a single data message +/*! + Data messages may not fit in a single MTU in the transport layer, in + that case the application configure a RMCast_Fragment module on the + sender side. On the receiver side this layer reassemble the + messages sent from a <EM>single</EM> source, and passes the messages + up the stream. +*/ class ACE_RMCast_Export ACE_RMCast_Reassembly : public ACE_RMCast_Module { public: + //! Constructor ACE_RMCast_Reassembly (void); - // Constructor + //! Destructor virtual ~ACE_RMCast_Reassembly (void); - // Destructor // = The ACE_RMCast_Module methods virtual int data (ACE_RMCast::Data &data); private: + //! A mutex used to synchronize all the internal operations. ACE_SYNCH_MUTEX mutex_; typedef ACE_Hash_Map_Manager<ACE_UINT32,ACE_RMCast_Partial_Message*,ACE_Null_Mutex> @@ -45,8 +54,9 @@ private: ACE_Hash_Map_Iterator<ACE_UINT32,ACE_RMCast_Partial_Message*,ACE_Null_Mutex> Message_Map_Iterator; + //! A map, indexed by sequence number, of the partially received + //! messages. Message_Map messages_; - // The array of partially received messages }; #if defined (__ACE_INLINE__) diff --git a/protocols/ace/RMCast/RMCast_Retransmission.cpp b/protocols/ace/RMCast/RMCast_Retransmission.cpp index b90cf0ddf31..a996e1204d5 100644 --- a/protocols/ace/RMCast/RMCast_Retransmission.cpp +++ b/protocols/ace/RMCast/RMCast_Retransmission.cpp @@ -96,6 +96,9 @@ ACE_RMCast_Retransmission::ack (ACE_RMCast::Ack &ack) #if defined (ACE_HAS_EXPLICIT_TEMPLATE_INSTANTIATION) template class ACE_RB_Tree<ACE_UINT32,ACE_RMCast::Data,ACE_Less_Than<ACE_UINT32>,ACE_Null_Mutex>; +template class ACE_RB_Tree_Iterator_Base<ACE_UINT32,ACE_RMCast::Data,ACE_Less_Than<ACE_UINT32>,ACE_Null_Mutex>; template class ACE_RB_Tree_Iterator<ACE_UINT32,ACE_RMCast::Data,ACE_Less_Than<ACE_UINT32>,ACE_Null_Mutex>; +template class ACE_RB_Tree_Reverse_Iterator<ACE_UINT32,ACE_RMCast::Data,ACE_Less_Than<ACE_UINT32>,ACE_Null_Mutex>; +template class ACE_RB_Tree_Node<ACE_UINT32,ACE_RMCast::Data>; #endif /* ACE_HAS_EXPLICIT_TEMPLATE_INSTANTIATION */ diff --git a/tests/RMCast/Makefile b/tests/RMCast/Makefile index 38dbce49c4d..c1949350a25 100644 --- a/tests/RMCast/Makefile +++ b/tests/RMCast/Makefile @@ -11,6 +11,7 @@ BIN = RMCast_Fragment_Test \ RMCast_Reassembly_Test \ RMCast_UDP_Best_Effort_Test \ + RMCast_Membership_Test PSRC=$(addsuffix .cpp,$(BIN)) LDLIBS = -lACE_RMCast @@ -71,16 +72,16 @@ endif $(ACE_ROOT)/ace/Synch.h \ $(ACE_ROOT)/ace/ACE.h \ $(ACE_ROOT)/ace/ACE.i \ - $(ACE_ROOT)/ace/SV_Semaphore_Complex.h \ - $(ACE_ROOT)/ace/SV_Semaphore_Simple.h \ - $(ACE_ROOT)/ace/SV_Semaphore_Simple.i \ - $(ACE_ROOT)/ace/SV_Semaphore_Complex.i \ $(ACE_ROOT)/ace/Synch.i \ $(ACE_ROOT)/ace/Synch_T.h \ $(ACE_ROOT)/ace/Event_Handler.h \ $(ACE_ROOT)/ace/Event_Handler.i \ $(ACE_ROOT)/ace/Synch_T.i \ $(ACE_ROOT)/ace/Thread.h \ + $(ACE_ROOT)/ace/Thread_Adapter.h \ + $(ACE_ROOT)/ace/Base_Thread_Adapter.h \ + $(ACE_ROOT)/ace/Base_Thread_Adapter.inl \ + $(ACE_ROOT)/ace/Thread_Adapter.inl \ $(ACE_ROOT)/ace/Thread.i \ $(ACE_ROOT)/ace/Atomic_Op.i \ $(ACE_ROOT)/ace/Synch_T.cpp \ @@ -124,6 +125,10 @@ endif $(ACE_ROOT)/ace/Signal.i \ $(ACE_ROOT)/ace/Mem_Map.h \ $(ACE_ROOT)/ace/Mem_Map.i \ + $(ACE_ROOT)/ace/SV_Semaphore_Complex.h \ + $(ACE_ROOT)/ace/SV_Semaphore_Simple.h \ + $(ACE_ROOT)/ace/SV_Semaphore_Simple.i \ + $(ACE_ROOT)/ace/SV_Semaphore_Complex.i \ $(ACE_ROOT)/ace/Memory_Pool.i \ $(ACE_ROOT)/ace/Thread_Manager.i \ $(ACE_ROOT)/ace/Task.i \ @@ -172,6 +177,8 @@ endif $(ACE_ROOT)/ace/Service_Types.i \ $(ACE_ROOT)/ace/Service_Repository.i \ $(ACE_ROOT)/ace/WFMO_Reactor.h \ + $(ACE_ROOT)/ace/Process_Mutex.h \ + $(ACE_ROOT)/ace/Process_Mutex.inl \ $(ACE_ROOT)/ace/WFMO_Reactor.i \ $(ACE_ROOT)/ace/Strategies.i \ $(ACE_ROOT)/ace/Message_Queue.i \ @@ -216,16 +223,16 @@ endif $(ACE_ROOT)/ace/Synch.h \ $(ACE_ROOT)/ace/ACE.h \ $(ACE_ROOT)/ace/ACE.i \ - $(ACE_ROOT)/ace/SV_Semaphore_Complex.h \ - $(ACE_ROOT)/ace/SV_Semaphore_Simple.h \ - $(ACE_ROOT)/ace/SV_Semaphore_Simple.i \ - $(ACE_ROOT)/ace/SV_Semaphore_Complex.i \ $(ACE_ROOT)/ace/Synch.i \ $(ACE_ROOT)/ace/Synch_T.h \ $(ACE_ROOT)/ace/Event_Handler.h \ $(ACE_ROOT)/ace/Event_Handler.i \ $(ACE_ROOT)/ace/Synch_T.i \ $(ACE_ROOT)/ace/Thread.h \ + $(ACE_ROOT)/ace/Thread_Adapter.h \ + $(ACE_ROOT)/ace/Base_Thread_Adapter.h \ + $(ACE_ROOT)/ace/Base_Thread_Adapter.inl \ + $(ACE_ROOT)/ace/Thread_Adapter.inl \ $(ACE_ROOT)/ace/Thread.i \ $(ACE_ROOT)/ace/Atomic_Op.i \ $(ACE_ROOT)/ace/Synch_T.cpp \ @@ -269,6 +276,10 @@ endif $(ACE_ROOT)/ace/Signal.i \ $(ACE_ROOT)/ace/Mem_Map.h \ $(ACE_ROOT)/ace/Mem_Map.i \ + $(ACE_ROOT)/ace/SV_Semaphore_Complex.h \ + $(ACE_ROOT)/ace/SV_Semaphore_Simple.h \ + $(ACE_ROOT)/ace/SV_Semaphore_Simple.i \ + $(ACE_ROOT)/ace/SV_Semaphore_Complex.i \ $(ACE_ROOT)/ace/Memory_Pool.i \ $(ACE_ROOT)/ace/Thread_Manager.i \ $(ACE_ROOT)/ace/Task.i \ @@ -317,6 +328,8 @@ endif $(ACE_ROOT)/ace/Service_Types.i \ $(ACE_ROOT)/ace/Service_Repository.i \ $(ACE_ROOT)/ace/WFMO_Reactor.h \ + $(ACE_ROOT)/ace/Process_Mutex.h \ + $(ACE_ROOT)/ace/Process_Mutex.inl \ $(ACE_ROOT)/ace/WFMO_Reactor.i \ $(ACE_ROOT)/ace/Strategies.i \ $(ACE_ROOT)/ace/Message_Queue.i \ @@ -362,16 +375,16 @@ endif $(ACE_ROOT)/ace/Synch.h \ $(ACE_ROOT)/ace/ACE.h \ $(ACE_ROOT)/ace/ACE.i \ - $(ACE_ROOT)/ace/SV_Semaphore_Complex.h \ - $(ACE_ROOT)/ace/SV_Semaphore_Simple.h \ - $(ACE_ROOT)/ace/SV_Semaphore_Simple.i \ - $(ACE_ROOT)/ace/SV_Semaphore_Complex.i \ $(ACE_ROOT)/ace/Synch.i \ $(ACE_ROOT)/ace/Synch_T.h \ $(ACE_ROOT)/ace/Event_Handler.h \ $(ACE_ROOT)/ace/Event_Handler.i \ $(ACE_ROOT)/ace/Synch_T.i \ $(ACE_ROOT)/ace/Thread.h \ + $(ACE_ROOT)/ace/Thread_Adapter.h \ + $(ACE_ROOT)/ace/Base_Thread_Adapter.h \ + $(ACE_ROOT)/ace/Base_Thread_Adapter.inl \ + $(ACE_ROOT)/ace/Thread_Adapter.inl \ $(ACE_ROOT)/ace/Thread.i \ $(ACE_ROOT)/ace/Atomic_Op.i \ $(ACE_ROOT)/ace/Synch_T.cpp \ @@ -443,6 +456,10 @@ endif $(ACE_ROOT)/ace/Memory_Pool.h \ $(ACE_ROOT)/ace/Mem_Map.h \ $(ACE_ROOT)/ace/Mem_Map.i \ + $(ACE_ROOT)/ace/SV_Semaphore_Complex.h \ + $(ACE_ROOT)/ace/SV_Semaphore_Simple.h \ + $(ACE_ROOT)/ace/SV_Semaphore_Simple.i \ + $(ACE_ROOT)/ace/SV_Semaphore_Complex.i \ $(ACE_ROOT)/ace/Memory_Pool.i \ $(ACE_ROOT)/ace/Signal.i \ $(ACE_ROOT)/ace/SString.h \ @@ -491,6 +508,161 @@ endif $(ACE_ROOT)/ace/Service_Types.i \ $(ACE_ROOT)/ace/Service_Repository.i \ $(ACE_ROOT)/ace/WFMO_Reactor.h \ + $(ACE_ROOT)/ace/Process_Mutex.h \ + $(ACE_ROOT)/ace/Process_Mutex.inl \ + $(ACE_ROOT)/ace/WFMO_Reactor.i \ + $(ACE_ROOT)/ace/Strategies.i \ + $(ACE_ROOT)/ace/Message_Queue.i \ + $(ACE_ROOT)/ace/Task_T.i \ + $(ACE_ROOT)/ace/Task_T.cpp \ + $(ACE_ROOT)/ace/Module.h \ + $(ACE_ROOT)/ace/Module.i \ + $(ACE_ROOT)/ace/Module.cpp \ + $(ACE_ROOT)/ace/Stream_Modules.h \ + $(ACE_ROOT)/ace/Stream_Modules.cpp + +.obj/RMCast_Membership_Test.o .obj/RMCast_Membership_Test.so .shobj/RMCast_Membership_Test.o .shobj/RMCast_Membership_Test.so: RMCast_Membership_Test.cpp ../test_config.h \ + $(ACE_ROOT)/ace/pre.h \ + $(ACE_ROOT)/ace/post.h \ + $(ACE_ROOT)/ace/ACE_export.h \ + $(ACE_ROOT)/ace/svc_export.h \ + $(ACE_ROOT)/ace/ace_wchar.h \ + $(ACE_ROOT)/ace/OS.h \ + $(ACE_ROOT)/ace/OS_Dirent.h \ + $(ACE_ROOT)/ace/OS_Export.h \ + $(ACE_ROOT)/ace/OS_Dirent.inl \ + $(ACE_ROOT)/ace/OS_String.h \ + $(ACE_ROOT)/ace/OS_String.inl \ + $(ACE_ROOT)/ace/OS_Memory.h \ + $(ACE_ROOT)/ace/OS_Memory.inl \ + $(ACE_ROOT)/ace/OS_TLI.h \ + $(ACE_ROOT)/ace/OS_TLI.inl \ + $(ACE_ROOT)/ace/Min_Max.h \ + $(ACE_ROOT)/ace/streams.h \ + $(ACE_ROOT)/ace/Basic_Types.h \ + $(ACE_ROOT)/ace/Basic_Types.i \ + $(ACE_ROOT)/ace/Trace.h \ + $(ACE_ROOT)/ace/OS.i \ + $(ACE_ROOT)/ace/Singleton.h \ + $(ACE_ROOT)/ace/Synch.h \ + $(ACE_ROOT)/ace/ACE.h \ + $(ACE_ROOT)/ace/ACE.i \ + $(ACE_ROOT)/ace/Synch.i \ + $(ACE_ROOT)/ace/Synch_T.h \ + $(ACE_ROOT)/ace/Event_Handler.h \ + $(ACE_ROOT)/ace/Event_Handler.i \ + $(ACE_ROOT)/ace/Synch_T.i \ + $(ACE_ROOT)/ace/Thread.h \ + $(ACE_ROOT)/ace/Thread_Adapter.h \ + $(ACE_ROOT)/ace/Base_Thread_Adapter.h \ + $(ACE_ROOT)/ace/Base_Thread_Adapter.inl \ + $(ACE_ROOT)/ace/Thread_Adapter.inl \ + $(ACE_ROOT)/ace/Thread.i \ + $(ACE_ROOT)/ace/Atomic_Op.i \ + $(ACE_ROOT)/ace/Synch_T.cpp \ + $(ACE_ROOT)/ace/Log_Msg.h \ + $(ACE_ROOT)/ace/Log_Record.h \ + $(ACE_ROOT)/ace/Log_Priority.h \ + $(ACE_ROOT)/ace/Log_Record.i \ + $(ACE_ROOT)/ace/Singleton.i \ + $(ACE_ROOT)/ace/Singleton.cpp \ + $(ACE_ROOT)/ace/Object_Manager.h \ + $(ACE_ROOT)/ace/Object_Manager.i \ + $(ACE_ROOT)/ace/Managed_Object.h \ + $(ACE_ROOT)/ace/Managed_Object.i \ + $(ACE_ROOT)/ace/Managed_Object.cpp \ + $(ACE_ROOT)/ace/RMCast/RMCast_Proxy.h \ + $(ACE_ROOT)/ace/RMCast/RMCast_Module.h \ + $(ACE_ROOT)/ace/RMCast/RMCast.h \ + $(ACE_ROOT)/ace/RMCast/RMCast_Export.h \ + $(ACE_ROOT)/ace/RMCast/RMCast.i \ + $(ACE_ROOT)/ace/RMCast/RMCast_Module.i \ + $(ACE_ROOT)/ace/RMCast/RMCast_Proxy.i \ + $(ACE_ROOT)/ace/RMCast/RMCast_Membership.h \ + $(ACE_ROOT)/ace/Containers.h \ + $(ACE_ROOT)/ace/Malloc_Base.h \ + $(ACE_ROOT)/ace/Containers.i \ + $(ACE_ROOT)/ace/Containers_T.h \ + $(ACE_ROOT)/ace/Containers_T.i \ + $(ACE_ROOT)/ace/Containers_T.cpp \ + $(ACE_ROOT)/ace/Malloc.h \ + $(ACE_ROOT)/ace/Based_Pointer_T.h \ + $(ACE_ROOT)/ace/Based_Pointer_T.i \ + $(ACE_ROOT)/ace/Based_Pointer_T.cpp \ + $(ACE_ROOT)/ace/Based_Pointer_Repository.h \ + $(ACE_ROOT)/ace/Malloc.i \ + $(ACE_ROOT)/ace/Malloc_T.h \ + $(ACE_ROOT)/ace/Free_List.h \ + $(ACE_ROOT)/ace/Free_List.i \ + $(ACE_ROOT)/ace/Free_List.cpp \ + $(ACE_ROOT)/ace/Malloc_T.i \ + $(ACE_ROOT)/ace/Malloc_T.cpp \ + $(ACE_ROOT)/ace/Memory_Pool.h \ + $(ACE_ROOT)/ace/Signal.h \ + $(ACE_ROOT)/ace/Signal.i \ + $(ACE_ROOT)/ace/Mem_Map.h \ + $(ACE_ROOT)/ace/Mem_Map.i \ + $(ACE_ROOT)/ace/SV_Semaphore_Complex.h \ + $(ACE_ROOT)/ace/SV_Semaphore_Simple.h \ + $(ACE_ROOT)/ace/SV_Semaphore_Simple.i \ + $(ACE_ROOT)/ace/SV_Semaphore_Complex.i \ + $(ACE_ROOT)/ace/Memory_Pool.i \ + $(ACE_ROOT)/ace/RMCast/RMCast_Membership.i \ + $(ACE_ROOT)/ace/Task.h \ + $(ACE_ROOT)/ace/Service_Object.h \ + $(ACE_ROOT)/ace/Shared_Object.h \ + $(ACE_ROOT)/ace/Shared_Object.i \ + $(ACE_ROOT)/ace/Service_Object.i \ + $(ACE_ROOT)/ace/Thread_Manager.h \ + $(ACE_ROOT)/ace/Thread_Manager.i \ + $(ACE_ROOT)/ace/Task.i \ + $(ACE_ROOT)/ace/Task_T.h \ + $(ACE_ROOT)/ace/Message_Queue.h \ + $(ACE_ROOT)/ace/Message_Block.h \ + $(ACE_ROOT)/ace/Message_Block.i \ + $(ACE_ROOT)/ace/Message_Block_T.h \ + $(ACE_ROOT)/ace/Message_Block_T.i \ + $(ACE_ROOT)/ace/Message_Block_T.cpp \ + $(ACE_ROOT)/ace/IO_Cntl_Msg.h \ + $(ACE_ROOT)/ace/Message_Queue_T.h \ + $(ACE_ROOT)/ace/Message_Queue_T.i \ + $(ACE_ROOT)/ace/Message_Queue_T.cpp \ + $(ACE_ROOT)/ace/Strategies.h \ + $(ACE_ROOT)/ace/Strategies_T.h \ + $(ACE_ROOT)/ace/Service_Config.h \ + $(ACE_ROOT)/ace/SString.h \ + $(ACE_ROOT)/ace/SString.i \ + $(ACE_ROOT)/ace/Service_Config.i \ + $(ACE_ROOT)/ace/Reactor.h \ + $(ACE_ROOT)/ace/Handle_Set.h \ + $(ACE_ROOT)/ace/Handle_Set.i \ + $(ACE_ROOT)/ace/Timer_Queue.h \ + $(ACE_ROOT)/ace/Timer_Queue_T.h \ + $(ACE_ROOT)/ace/Timer_Queue_T.i \ + $(ACE_ROOT)/ace/Timer_Queue_T.cpp \ + $(ACE_ROOT)/ace/Reactor.i \ + $(ACE_ROOT)/ace/Reactor_Impl.h \ + $(ACE_ROOT)/ace/Svc_Conf_Tokens.h \ + $(ACE_ROOT)/ace/Synch_Options.h \ + $(ACE_ROOT)/ace/Synch_Options.i \ + $(ACE_ROOT)/ace/Hash_Map_Manager.h \ + $(ACE_ROOT)/ace/Functor.h \ + $(ACE_ROOT)/ace/Functor.i \ + $(ACE_ROOT)/ace/Functor_T.h \ + $(ACE_ROOT)/ace/Functor_T.i \ + $(ACE_ROOT)/ace/Functor_T.cpp \ + $(ACE_ROOT)/ace/Hash_Map_Manager_T.h \ + $(ACE_ROOT)/ace/Hash_Map_Manager_T.i \ + $(ACE_ROOT)/ace/Hash_Map_Manager_T.cpp \ + $(ACE_ROOT)/ace/Strategies_T.i \ + $(ACE_ROOT)/ace/Strategies_T.cpp \ + $(ACE_ROOT)/ace/Service_Repository.h \ + $(ACE_ROOT)/ace/Service_Types.h \ + $(ACE_ROOT)/ace/Service_Types.i \ + $(ACE_ROOT)/ace/Service_Repository.i \ + $(ACE_ROOT)/ace/WFMO_Reactor.h \ + $(ACE_ROOT)/ace/Process_Mutex.h \ + $(ACE_ROOT)/ace/Process_Mutex.inl \ $(ACE_ROOT)/ace/WFMO_Reactor.i \ $(ACE_ROOT)/ace/Strategies.i \ $(ACE_ROOT)/ace/Message_Queue.i \ diff --git a/tests/RMCast/RMCast_Membership_Test.cpp b/tests/RMCast/RMCast_Membership_Test.cpp new file mode 100644 index 00000000000..54b5f9a5aa8 --- /dev/null +++ b/tests/RMCast/RMCast_Membership_Test.cpp @@ -0,0 +1,449 @@ +// $Id$ + +// ============================================================================ +// +// = DESCRIPTION +// Unit test for the UDP sending module of the RMCast library. +// +// = AUTHORS +// Carlos O'Ryan <coryan@uci.edu> +// +// ============================================================================ + +#include "test_config.h" +#include "ace/RMCast/RMCast_Proxy.h" +#include "ace/RMCast/RMCast_Membership.h" + +#include "ace/Task.h" + +ACE_RCSID(tests, RMCast_Membership_Test, "$Id$") + +const size_t message_size = 8 * 1024; +const int total_message_count = 40; + +// **************************************************************** + +//! Simple proxy for the ACE_RMCast_Membership test harness +/*! + * Implement a simple version of the ACE_RMCast_Proxy class used in + * the ACE_RMCast_Membership test harness. + */ +class Test_Proxy : public ACE_RMCast_Proxy +{ +public: + Test_Proxy (void); + + //! Get the flag to remember if this proxy has joined the group or + //! not. + int joined (void) const + { + return this->joined_; + } + //! Set the flag to remember if this proxy has joined the group or + //! not. + void joined (int j) + { + this->joined_ = j; + } + + //@{ + //! All the reply_ methods just return 0, there is no real remote + //! peer, this is just a test harness + virtual int reply_data (ACE_RMCast::Data &) + { + return 0; + } + virtual int reply_poll (ACE_RMCast::Poll &) + { + return 0; + } + virtual int reply_ack_join (ACE_RMCast::Ack_Join &) + { + return 0; + } + virtual int reply_ack_leave (ACE_RMCast::Ack_Leave &) + { + return 0; + } + virtual int reply_ack (ACE_RMCast::Ack &) + { + return 0; + } + virtual int reply_join (ACE_RMCast::Join &) + { + return 0; + } + virtual int reply_leave (ACE_RMCast::Leave &) + { + return 0; + } + //@} + +private: + //! Remember if we joined the group already. + int joined_; +}; + +// **************************************************************** + +//! The number of proxies used in the test +/*! + * Not all member will be present in the group at the same time. But + * this variable controls the maximum number + */ +const size_t nproxy = 16; + +//! A simple module to receive the messages from ACE_RMCast_Membership +/*! + * The ACE_RMCast_Membership layer pushes messages to its next module + * when all the members have acked a message, when a new member joins, + * when a member leaves, etc. + * This class will verify that the messages are exactly what we + * expect. + */ +class Tester : public ACE_RMCast_Module +{ +public: + Tester (void); + + //! Run the test for \iterations times + void run (int iterations); + + virtual int join (ACE_RMCast::Join &join); + virtual int leave (ACE_RMCast::Leave &leave); + virtual int ack (ACE_RMCast::Ack &ack); + +private: + //! Add a few proxies to the group + void join_random (void); + + //! Remove a few proxies from the group + void leave_random (void); + + //! Generate a few ack messages from all the proxies currently in + //! the group + void generate_acks (int iterations); + +private: + //! The array of proxies + Test_Proxy proxy_[nproxy]; + + //! The Membership layer + ACE_RMCast_Membership membership_; + + //! Synchronize internal data structures + ACE_SYNCH_MUTEX lock_; + + //! The test is randomized to get better coverage. This is the seed + //! variable for the test + ACE_RANDR_TYPE seed_; +}; + +// **************************************************************** + +//! An Adapter to run Tester::run the test is a separate thread +class Task : public ACE_Task_Base +{ +public: + Task (Tester *tester); + + // = Read the documentation in "ace/Task.h" + int svc (void); + +private: + //! The tester object. + Tester *tester_; +}; + +// **************************************************************** + +int +main (int, ACE_TCHAR *[]) +{ + ACE_START_TEST (ACE_TEXT ("RMCast_Membership_Test")); + + ACE_DEBUG ((LM_DEBUG, + ACE_TEXT ("This is ACE Version %u.%u.%u\n\n"), + ACE::major_version(), + ACE::minor_version(), + ACE::beta_version())); + + { + ACE_DEBUG ((LM_DEBUG, "Running single threaded test\n")); + //! Run the test in single threaded mode + Tester tester; + tester.run (100); + } + { + ACE_DEBUG ((LM_DEBUG, "Running multi threaded test\n")); + //! Run the test in multi-threaded mode + Tester tester; + Task task (&tester); + if (task.activate (THR_NEW_LWP|THR_JOINABLE, 4) == -1) + ACE_ERROR_RETURN ((LM_ERROR, "Cannot activate the threads\n"), 1); + ACE_Thread_Manager::instance ()->wait (); + } + + ACE_END_TEST; + return 0; +} + +// **************************************************************** + +Test_Proxy::Test_Proxy (void) + : joined_ (0) +{ +} + +// **************************************************************** + +Tester::Tester (void) + : seed_ (ACE_OS::gethrtime ()) +{ + // Initialize the stack... + this->membership_.next (this); + for (size_t i = 0; i != nproxy; ++i) + this->proxy_[i].next (&this->membership_); +} + +void +Tester::run (int iterations) +{ + for (int i = 0; i < iterations; ++i) + { + // Connect a few.... + this->join_random (); + + // Push acks.... + this->generate_acks (iterations); + + // Disconnect a few + this->leave_random (); + + // Push acks... + this->generate_acks (iterations / 10); + } +} + +int +Tester::join (ACE_RMCast::Join &join) +{ + if (join.source == 0) + { + ACE_ERROR_RETURN ((LM_ERROR, + "Invalid join message in Tester\n"), + -1); + } + for (size_t i = 0; i != nproxy; ++i) + { + if (&this->proxy_[i] != join.source) + continue; + if (this->proxy_[i].joined () != 1) + ACE_ERROR_RETURN ((LM_ERROR, + "Invalid state for proxy %d " + "in Tester::join\n"), + -1); + return 0; + } + // Not found + ACE_ERROR_RETURN ((LM_ERROR, "Unknown proxy in Tester::join\n"), -1); +} + +int +Tester::leave (ACE_RMCast::Leave &leave) +{ + if (leave.source == 0) + { + ACE_ERROR_RETURN ((LM_ERROR, + "Invalid leave message in Tester\n"), + -1); + } + for (size_t i = 0; i != nproxy; ++i) + { + if (&this->proxy_[i] != leave.source) + continue; + if (this->proxy_[i].joined () != 0) + ACE_ERROR_RETURN ((LM_ERROR, + "Invalid state for proxy %d " + "in Tester::leave\n"), + -1); + return 0; + } + // Not found + ACE_ERROR_RETURN ((LM_ERROR, "Unknown proxy in Tester::leave\n"), -1); +} + +int +Tester::ack (ACE_RMCast::Ack &ack) +{ + // After the membership layer the source makes no sense.... + if (ack.source == 0) + { + ACE_ERROR_RETURN ((LM_ERROR, + "Invalid ack message in Tester\n"), + -1); + } + + // ACE_DEBUG ((LM_DEBUG, + // "Received ack in Tester %d,%d\n", + // ack.highest_in_sequence, + // ack.highest_received)); + + // Assume the lock is held, verify that the ack message satisfy the + // invariants... + ACE_UINT32 highest_in_sequence; + ACE_UINT32 highest_received; + int set = 0; + for (size_t i = 0; i != nproxy; ++i) + { + if (!this->proxy_[i].joined ()) + continue; + if (!set) + { + set = 1; + highest_in_sequence = this->proxy_[i].highest_in_sequence (); + highest_received = this->proxy_[i].highest_received (); + } + else + { + if (highest_in_sequence > + this->proxy_[i].highest_in_sequence ()) + { + highest_in_sequence = + this->proxy_[i].highest_in_sequence (); + } + if (highest_received < + this->proxy_[i].highest_received ()) + { + highest_received = + this->proxy_[i].highest_received (); + } + } + } + // No local proxy just return... + if (!set) + return 0; + + // Check the invariants + if (ack.highest_in_sequence != highest_in_sequence) + { + ACE_ERROR_RETURN ((LM_ERROR, + "Invalid highest_in_sequence in Ack\n"), + -1); + } + if (ack.highest_received != highest_received) + { + ACE_ERROR_RETURN ((LM_ERROR, + "Invalid highest_received in Ack\n"), + -1); + } + return 0; +} + +void +Tester::join_random (void) +{ + for (size_t i = 0; i != nproxy; ++i) + { + ACE_GUARD (ACE_SYNCH_MUTEX, ace_mon, this->lock_); + int r = ACE_OS::rand_r (this->seed_) % 100; + if (this->proxy_[i].joined () == 0 && r > 25) + { + this->proxy_[i].joined (1); + + ACE_RMCast::Join join; + join.source = &this->proxy_[i]; + // ACE_DEBUG ((LM_DEBUG, + // "Sending join mesage for proxy %d\n", + // i)); + this->proxy_[i].join (join); + } + } +} + +void +Tester::leave_random (void) +{ + for (size_t i = 0; i != nproxy; ++i) + { + ACE_GUARD (ACE_SYNCH_MUTEX, ace_mon, this->lock_); + int r = ACE_OS::rand_r (this->seed_) % 100; + if (this->proxy_[i].joined () == 1 && r > 75) + { + this->proxy_[i].joined (0); + + ACE_RMCast::Leave leave; + leave.source = &this->proxy_[i]; + // ACE_DEBUG ((LM_DEBUG, + // "Sending leave mesage for proxy %d\n", + // i)); + this->proxy_[i].leave (leave); + } + } +} + +void +Tester::generate_acks (int iterations) +{ + int n = 0; + for (size_t i = 0; n < iterations && i != nproxy; ++i, ++n) + { + ACE_GUARD (ACE_SYNCH_MUTEX, ace_mon, this->lock_); + int r = ACE_OS::rand_r (this->seed_) % 10; + if (this->proxy_[i].joined () == 0) + continue; + + ACE_RMCast::Ack ack; + ack.source = &this->proxy_[i]; + ack.highest_in_sequence = + this->proxy_[i].highest_in_sequence (); + ack.highest_received = + this->proxy_[i].highest_received (); + + // we randomly perform one of 3 actions, with different + // probabilities + switch (r) + { + case 0: + // Ack the same data that we already have: + break; + case 1: + case 2: + case 3: + case 4: + // Simulate out of sequence messages... + ack.highest_received++; + break; + default: + if (ack.highest_received > ack.highest_in_sequence) + ack.highest_in_sequence++; + break; + } + // ACE_DEBUG ((LM_DEBUG, + // "Sending ack message (%d,%d) through proxy %d\n", + // ack.highest_in_sequence, + // ack.highest_received, + // i)); + int result = this->proxy_[i].ack (ack); + if (result != 0) + { + ACE_ERROR ((LM_ERROR, + "Returned %d in proxy %d\n", + result, i)); + } + } +} + +// **************************************************************** + +Task::Task (Tester *tester) + : tester_ (tester) +{ +} + +int +Task::svc (void) +{ + this->tester_->run (100); + return 0; +} |