diff options
author | coryan <coryan@ae88bc3d-4319-0410-8dbf-d08b4c9d3795> | 2000-10-04 19:28:28 +0000 |
---|---|---|
committer | coryan <coryan@ae88bc3d-4319-0410-8dbf-d08b4c9d3795> | 2000-10-04 19:28:28 +0000 |
commit | cd2323da3c06813b88c78946e599933f023adea2 (patch) | |
tree | 1688c8f56c860ce8ab7e223e943c401981c8ca62 /protocols | |
parent | 695ca6adf21fb6fb6ddc732822eb326747df59fe (diff) | |
download | ATCD-cd2323da3c06813b88c78946e599933f023adea2.tar.gz |
ChangeLogTag:Wed Oct 04 12:23:34 2000 Carlos O'Ryan <coryan@uci.edu>
Diffstat (limited to 'protocols')
-rw-r--r-- | protocols/ace/RMCast/RMCast_Copy_On_Write.cpp | 17 | ||||
-rw-r--r-- | protocols/ace/RMCast/RMCast_Copy_On_Write.h | 56 | ||||
-rw-r--r-- | protocols/ace/RMCast/RMCast_Copy_On_Write.i | 7 | ||||
-rw-r--r-- | protocols/ace/RMCast/RMCast_Proxy.cpp | 11 | ||||
-rw-r--r-- | protocols/ace/RMCast/RMCast_Proxy.h | 4 | ||||
-rw-r--r-- | protocols/ace/RMCast/RMCast_Reordering.cpp | 16 | ||||
-rw-r--r-- | protocols/ace/RMCast/RMCast_Retransmission.cpp | 23 | ||||
-rw-r--r-- | protocols/ace/RMCast/RMCast_UDP_Proxy.cpp | 5 |
8 files changed, 82 insertions, 57 deletions
diff --git a/protocols/ace/RMCast/RMCast_Copy_On_Write.cpp b/protocols/ace/RMCast/RMCast_Copy_On_Write.cpp index 20a76e559e0..eeb3b9422c2 100644 --- a/protocols/ace/RMCast/RMCast_Copy_On_Write.cpp +++ b/protocols/ace/RMCast/RMCast_Copy_On_Write.cpp @@ -60,7 +60,7 @@ template<class KEY, class ITEM, class COLLECTION, class ITERATOR> int ACE_RMCast_Copy_On_Write<KEY,ITEM,COLLECTION,ITERATOR>:: for_each (ACE_RMCast_Worker<KEY,ITEM> *worker) { - Read_Guard ace_mon (this->mutex_, this->collection_); + Read_Guard ace_mon (*this); ITERATOR end = ace_mon.collection->collection.end (); for (ITERATOR i = ace_mon.collection->collection.begin (); i != end; ++i) @@ -74,6 +74,19 @@ ACE_RMCast_Copy_On_Write<KEY,ITEM,COLLECTION,ITERATOR>:: return 0; } +template<class KEY, class ITEM, class C, class ITERATOR> KEY +ACE_RMCast_Copy_On_Write<KEY,ITEM,C,ITERATOR>::first_key (void) +{ + Read_Guard ace_mon (*this); + ITERATOR end = ace_mon.collection->collection.end (); + ITERATOR begin = ace_mon.collection->collection.begin (); + if (begin == end) + { + return KEY (); + } + return (*begin).key (); +} + template<class KEY, class ITEM, class C, class I> int ACE_RMCast_Copy_On_Write<KEY,ITEM,C,I>::bind (KEY const & k, ITEM const & i) @@ -121,7 +134,7 @@ ACE_RMCast_Copy_On_Write_Container<COLLECTION,ITERATOR>::ACE_RMCast_Copy_On_Writ template<class COLLECTION, class ITERATOR> ACE_RMCast_Copy_On_Write_Write_Guard<COLLECTION,ITERATOR>:: - ACE_RMCast_Copy_On_Write_Write_Guard (ACE_RMCast_Copy_On_Write_Container<COLLECTION,ITERATOR> &container) + ACE_RMCast_Copy_On_Write_Write_Guard (Container &container) : copy (0) , mutex (container.mutex_) , cond (container.cond_) diff --git a/protocols/ace/RMCast/RMCast_Copy_On_Write.h b/protocols/ace/RMCast/RMCast_Copy_On_Write.h index e06aa5f1689..d3513a05d2d 100644 --- a/protocols/ace/RMCast/RMCast_Copy_On_Write.h +++ b/protocols/ace/RMCast/RMCast_Copy_On_Write.h @@ -33,29 +33,8 @@ private: // **************************************************************** -//! Implement a read guard for a reference counted collection template<class COLLECTION, class ITERATOR> -class ACE_RMCast_Copy_On_Write_Read_Guard -{ -public: - typedef ACE_RMCast_Copy_On_Write_Collection<COLLECTION,ITERATOR> Collection; - - //! Constructor - ACE_RMCast_Copy_On_Write_Read_Guard (ACE_SYNCH_MUTEX &mutex, - Collection *&collection); - - //! Destructor - ~ACE_RMCast_Copy_On_Write_Read_Guard (void); - - //! A reference to the collection - Collection *collection; - -private: - //! Synchronization - ACE_SYNCH_MUTEX &mutex_; -}; - -// **************************************************************** +class ACE_RMCast_Copy_On_Write_Read_Guard; template<class COLLECTION, class ITERATOR> class ACE_RMCast_Copy_On_Write_Write_Guard; @@ -72,6 +51,9 @@ public: //! Let the Write_Guard access the internal fields. friend ACE_RMCast_Copy_On_Write_Write_Guard<COLLECTION,ITERATOR>; + //! Let the Read_Guard access the internal fields. + friend ACE_RMCast_Copy_On_Write_Read_Guard<COLLECTION,ITERATOR>; + //! A shorter name for the actual collection type typedef ACE_RMCast_Copy_On_Write_Collection<COLLECTION,ITERATOR> Collection; @@ -98,6 +80,30 @@ protected: // **************************************************************** +//! Implement a read guard for a reference counted collection +template<class COLLECTION, class ITERATOR> +class ACE_RMCast_Copy_On_Write_Read_Guard +{ +public: + typedef ACE_RMCast_Copy_On_Write_Collection<COLLECTION,ITERATOR> Collection; + typedef ACE_RMCast_Copy_On_Write_Container<COLLECTION,ITERATOR> Container; + + //! Constructor + ACE_RMCast_Copy_On_Write_Read_Guard (Container &container); + + //! Destructor + ~ACE_RMCast_Copy_On_Write_Read_Guard (void); + + //! A reference to the collection + Collection *collection; + +private: + //! Synchronization + ACE_SYNCH_MUTEX &mutex_; +}; + +// **************************************************************** + //! Implement the write guard for a reference counted collecion /*! * This helper class atomically increments the reference count of a @@ -109,9 +115,10 @@ class ACE_RMCast_Copy_On_Write_Write_Guard { public: typedef ACE_RMCast_Copy_On_Write_Collection<COLLECTION,ITERATOR> Collection; + typedef ACE_RMCast_Copy_On_Write_Container<COLLECTION,ITERATOR> Container; //! Constructor - ACE_RMCast_Copy_On_Write_Write_Guard (ACE_RMCast_Copy_On_Write_Container<COLLECTION,ITERATOR> &container); + ACE_RMCast_Copy_On_Write_Write_Guard (Container &container); //! Destructor ~ACE_RMCast_Copy_On_Write_Write_Guard (void); @@ -171,6 +178,9 @@ public: //! Iterate over all the elements invoking \param worker on each one. int for_each (ACE_RMCast_Worker<KEY,ITEM> *worker); + //! Get the first key + KEY first_key (void); + //! Add a new element int bind (KEY const & key, ITEM const & item); diff --git a/protocols/ace/RMCast/RMCast_Copy_On_Write.i b/protocols/ace/RMCast/RMCast_Copy_On_Write.i index 354dd51bf5a..3c069c84eb6 100644 --- a/protocols/ace/RMCast/RMCast_Copy_On_Write.i +++ b/protocols/ace/RMCast/RMCast_Copy_On_Write.i @@ -11,13 +11,12 @@ ACE_RMCast_Copy_On_Write_Collection<COLLECTION,ITERATOR>:: template<class COLLECTION, class ITERATOR> ACE_INLINE ACE_RMCast_Copy_On_Write_Read_Guard<COLLECTION,ITERATOR>:: - ACE_RMCast_Copy_On_Write_Read_Guard (ACE_SYNCH_MUTEX &m, - Collection*& collection_ref) + ACE_RMCast_Copy_On_Write_Read_Guard (Container &container) : collection (0) - , mutex_ (m) + , mutex_ (container.mutex_) { ACE_GUARD (ACE_SYNCH_MUTEX, ace_mon, this->mutex_); - this->collection = collection_ref; + this->collection = container.collection_; this->collection->_incr_refcnt (); } diff --git a/protocols/ace/RMCast/RMCast_Proxy.cpp b/protocols/ace/RMCast/RMCast_Proxy.cpp index f6b2bbec5e5..2257f9a2bd7 100644 --- a/protocols/ace/RMCast/RMCast_Proxy.cpp +++ b/protocols/ace/RMCast/RMCast_Proxy.cpp @@ -33,3 +33,14 @@ ACE_RMCast_Proxy::ack (ACE_RMCast::Ack &ack) this->highest_received_ = ack.highest_received; return this->ACE_RMCast_Module::ack (ack); } + +int +ACE_RMCast_Proxy::reply_ack_join (ACE_RMCast::Ack_Join &ack_join) +{ + if (this->next_expected_ < ack_join.next_sequence_number) + { + this->next_expected_ = ack_join.next_sequence_number; + this->highest_received_ = ack_join.next_sequence_number; + } + return 0; +} diff --git a/protocols/ace/RMCast/RMCast_Proxy.h b/protocols/ace/RMCast/RMCast_Proxy.h index e0e6afe79b1..d774efe4b2c 100644 --- a/protocols/ace/RMCast/RMCast_Proxy.h +++ b/protocols/ace/RMCast/RMCast_Proxy.h @@ -48,7 +48,7 @@ public: //! Destructor virtual ~ACE_RMCast_Proxy (void); - + //! Return the next sequence number expected by the peer. Only //! applies to remote receiver proxies. /*! @@ -72,7 +72,7 @@ public: */ virtual int reply_data (ACE_RMCast::Data &) = 0; virtual int reply_poll (ACE_RMCast::Poll &) = 0; - virtual int reply_ack_join (ACE_RMCast::Ack_Join &) = 0; + virtual int reply_ack_join (ACE_RMCast::Ack_Join &); virtual int reply_ack_leave (ACE_RMCast::Ack_Leave &) = 0; virtual int reply_ack (ACE_RMCast::Ack &) = 0; virtual int reply_join (ACE_RMCast::Join &) = 0; diff --git a/protocols/ace/RMCast/RMCast_Reordering.cpp b/protocols/ace/RMCast/RMCast_Reordering.cpp index f63c6c66f34..a984fa9b989 100644 --- a/protocols/ace/RMCast/RMCast_Reordering.cpp +++ b/protocols/ace/RMCast/RMCast_Reordering.cpp @@ -52,14 +52,14 @@ ACE_RMCast_Reordering::data (ACE_RMCast::Data &data) // is "old". this->next_expected_++; - + // Right message, process as many messages as possible from // the queue, then ack the right level... // NOTE: we cannot release the mutex while dispatching // events, otherwise: how do we stop other threads from // delivering messages out of order? I.E. what if the - // next thread receives the next message? + // next thread receives the next message? if (this->next () != 0) (void) this->next ()->data (data); @@ -83,7 +83,9 @@ ACE_RMCast_Reordering::data (ACE_RMCast::Data &data) { this->highest_received_ = data.sequence_number; } - (void) this->messages_.bind (data.sequence_number, data); + ACE_RMCast::Data new_data = data; + new_data.payload = ACE_Message_Block::duplicate (data.payload); + (void) this->messages_.bind (data.sequence_number, new_data); // re-ack, otherwise save it and ack. } @@ -109,10 +111,11 @@ ACE_RMCast_Reordering::ack_join (ACE_RMCast::Ack_Join &ack_join) Messages_Iterator i = this->messages_.begin (); Messages_Iterator end = this->messages_.end (); - + while (i != end && (*i).key () < ack_join.next_sequence_number) { + ACE_Message_Block::release ((*i).item ().payload); this->messages_.unbind ((*i).key ()); i = this->messages_.begin (); } @@ -123,7 +126,7 @@ ACE_RMCast_Reordering::ack_join (ACE_RMCast::Ack_Join &ack_join) this->push_queued_messages (); } - + return 0; } @@ -132,7 +135,7 @@ ACE_RMCast_Reordering::push_queued_messages (void) { Messages_Iterator i = this->messages_.begin (); Messages_Iterator end = this->messages_.end (); - + while (i != end && (*i).key () == this->next_expected_) { @@ -142,6 +145,7 @@ ACE_RMCast_Reordering::push_queued_messages (void) this->next ()->data (data); } + ACE_Message_Block::release ((*i).item ().payload); this->messages_.unbind ((*i).key ()); i = this->messages_.begin (); this->next_expected_++; diff --git a/protocols/ace/RMCast/RMCast_Retransmission.cpp b/protocols/ace/RMCast/RMCast_Retransmission.cpp index 4d40a59ac5e..c9db70cbf63 100644 --- a/protocols/ace/RMCast/RMCast_Retransmission.cpp +++ b/protocols/ace/RMCast/RMCast_Retransmission.cpp @@ -96,23 +96,9 @@ ACE_RMCast_Retransmission::join (ACE_RMCast::Join &join) return 0; ACE_RMCast::Ack_Join ack_join; -#if 0 - // @@ - { - Messages_Iterator end = this->messages_.end (); - Messages_Iterator begin = this->messages_.begin (); - - ack_join.source = 0; - if (begin == end) - { - ack_join.next_sequence_number = 0; - } - else - { - ack_join.next_sequence_number = (*begin).key (); - } - } -#endif + ack_join.source = 0; + ack_join.next_sequence_number = this->messages_.first_key (); + (void) join.source->reply_ack_join (ack_join); // @@ We should force a full retransmission of all the messages! @@ -134,13 +120,14 @@ public: } int work (ACE_UINT32 const & key, - ACE_RMCast::Data const &) + ACE_RMCast::Data const &item) { if (key >= this->ack_.next_expected) return 0; // ACE_DEBUG ((LM_DEBUG, // " Retransmission::ack - message %d erased\n", // key)); + ACE_Message_Block::release (item.payload); return this->messages_->unbind_i (this->ace_mon_, key); } diff --git a/protocols/ace/RMCast/RMCast_UDP_Proxy.cpp b/protocols/ace/RMCast/RMCast_UDP_Proxy.cpp index 2eb0983b171..fbb37eca2d6 100644 --- a/protocols/ace/RMCast/RMCast_UDP_Proxy.cpp +++ b/protocols/ace/RMCast/RMCast_UDP_Proxy.cpp @@ -147,9 +147,10 @@ ACE_RMCast_UDP_Proxy::reply_poll (ACE_RMCast::Poll &poll) } int -ACE_RMCast_UDP_Proxy::reply_ack_join (ACE_RMCast::Ack_Join &ack) +ACE_RMCast_UDP_Proxy::reply_ack_join (ACE_RMCast::Ack_Join &ack_join) { - return this->io_udp_->send_ack_join (ack, this->peer_addr_); + (void) this->ACE_RMCast_Proxy::reply_ack_join (ack_join); + return this->io_udp_->send_ack_join (ack_join, this->peer_addr_); } int |