diff options
author | coryan <coryan@ae88bc3d-4319-0410-8dbf-d08b4c9d3795> | 2000-10-02 18:40:06 +0000 |
---|---|---|
committer | coryan <coryan@ae88bc3d-4319-0410-8dbf-d08b4c9d3795> | 2000-10-02 18:40:06 +0000 |
commit | 3c7edb739b596313a3e15fe1373bd488c2f37009 (patch) | |
tree | ec3f3e0df41c8f76d9c1ae3d472221e78a23d61f /ace/RMCast/RMCast_Retransmission.cpp | |
parent | 6c8cdfe85f70b9be20d1dd80f9730dae491ff403 (diff) | |
download | ATCD-3c7edb739b596313a3e15fe1373bd488c2f37009.tar.gz |
ChangeLogTag:Mon Oct 2 11:29:47 2000 Carlos O'Ryan <coryan@uci.edu>
Diffstat (limited to 'ace/RMCast/RMCast_Retransmission.cpp')
-rw-r--r-- | ace/RMCast/RMCast_Retransmission.cpp | 127 |
1 files changed, 105 insertions, 22 deletions
diff --git a/ace/RMCast/RMCast_Retransmission.cpp b/ace/RMCast/RMCast_Retransmission.cpp index a996e1204d5..7e38cdf7c97 100644 --- a/ace/RMCast/RMCast_Retransmission.cpp +++ b/ace/RMCast/RMCast_Retransmission.cpp @@ -16,18 +16,60 @@ ACE_RMCast_Retransmission::~ACE_RMCast_Retransmission (void) { } +class ACE_RMCast_Resend_Worker + : public ACE_RMCast_Worker<ACE_UINT32,ACE_RMCast::Data> +{ +public: + ACE_RMCast_Resend_Worker (ACE_RMCast_Module *next, + ACE_UINT32 max_sequence_number) + : n (0) + , next_ (next) + , max_sequence_number_ (max_sequence_number) + { + } + + int work (ACE_UINT32 const & key, + ACE_RMCast::Data const & item) + { + if (key > this->max_sequence_number_) + return 0; + ACE_DEBUG ((LM_DEBUG, + " Retransmission::resend - message %d resent\n", + key)); + ACE_RMCast::Data data = item; + int r = this->next_->data (data); + if (r != 0) + return r; + n++; + return 0; + } + + int n; + +private: + ACE_RMCast_Module *next_; + + ACE_UINT32 max_sequence_number_; +}; + int -ACE_RMCast_Retransmission::close (void) +ACE_RMCast_Retransmission::resend (ACE_UINT32 max_sequence_number) { - Messages_Iterator end = this->messages_.end (); + if (this->next () == 0) + return 0; - for (Messages_Iterator i = this->messages_.begin (); - i != end; - ++i) - { - ACE_Message_Block::release ((*i).item ().payload); - } - this->messages_.close (); + ACE_RMCast_Resend_Worker worker (this->next (), max_sequence_number); + + if (this->messages_.for_each (&worker) == -1) + return -1; + + return worker.n; +} + +int +ACE_RMCast_Retransmission::close (void) +{ + // @@ return 0; } @@ -40,7 +82,6 @@ ACE_RMCast_Retransmission::data (ACE_RMCast::Data &data) int r = this->next ()->data (data); if (r == 0) { - ACE_GUARD_RETURN (ACE_SYNCH_MUTEX, ace_mon, this->mutex_, -1); ACE_RMCast::Data copy = data; copy.payload = ACE_Message_Block::duplicate (data.payload); r = this->messages_.bind (data.sequence_number, copy); @@ -55,8 +96,9 @@ ACE_RMCast_Retransmission::join (ACE_RMCast::Join &join) return 0; ACE_RMCast::Ack_Join ack_join; +#if 0 + // @@ { - ACE_GUARD_RETURN (ACE_SYNCH_MUTEX, ace_mon, this->mutex_, -1); Messages_Iterator end = this->messages_.end (); Messages_Iterator begin = this->messages_.begin (); @@ -70,6 +112,7 @@ ACE_RMCast_Retransmission::join (ACE_RMCast::Join &join) ack_join.next_sequence_number = (*begin).key (); } } +#endif (void) join.source->reply_ack_join (ack_join); // @@ We should force a full retransmission of all the messages! @@ -77,20 +120,54 @@ ACE_RMCast_Retransmission::join (ACE_RMCast::Join &join) return 0; } +class ACE_RMCast_Ack_Worker + : public ACE_RMCast_Worker<ACE_UINT32,ACE_RMCast::Data> +{ +public: + ACE_RMCast_Ack_Worker (ACE_RMCast::Ack &ack, + ACE_RMCast_Retransmission::Messages::Write_Guard &g, + ACE_RMCast_Retransmission::Messages *messages) + : ack_ (ack) + , ace_mon_ (g) + , messages_ (messages) + { + } + + int work (ACE_UINT32 const & key, + ACE_RMCast::Data const &) + { + if (key >= this->ack_.next_expected) + return 0; + ACE_DEBUG ((LM_DEBUG, + " Retransmission::ack - message %d erased\n", + key)); + return this->messages_->unbind_i (this->ace_mon_, key); + } + +private: + ACE_RMCast_Ack_Worker (const ACE_RMCast_Ack_Worker&); + ACE_RMCast_Ack_Worker& operator= (const ACE_RMCast_Ack_Worker&); + +private: + ACE_RMCast::Ack &ack_; + + ACE_RMCast_Retransmission::Messages::Write_Guard &ace_mon_; + + ACE_RMCast_Retransmission::Messages *messages_; +}; + int ACE_RMCast_Retransmission::ack (ACE_RMCast::Ack &ack) { - ACE_GUARD_RETURN (ACE_SYNCH_MUTEX, ace_mon, this->mutex_, -1); - for (Messages_Iterator i = this->messages_.begin (); - i != this->messages_.end (); - /* do nothing */) - { - ACE_UINT32 key = (*i).key (); - if (key > ack.highest_in_sequence) - break; - this->messages_.unbind (key); - } - return 0; + Messages::Write_Guard ace_mon (this->messages_.mutex_, + this->messages_.cond_, + this->messages_.pending_writes_, + this->messages_.writing_, + this->messages_.collection_); + + ACE_RMCast_Ack_Worker worker (ack, ace_mon, &this->messages_); + + return this->messages_.for_each (&worker); } #if defined (ACE_HAS_EXPLICIT_TEMPLATE_INSTANTIATION) @@ -101,4 +178,10 @@ template class ACE_RB_Tree_Iterator<ACE_UINT32,ACE_RMCast::Data,ACE_Less_Than<AC template class ACE_RB_Tree_Reverse_Iterator<ACE_UINT32,ACE_RMCast::Data,ACE_Less_Than<ACE_UINT32>,ACE_Null_Mutex>; template class ACE_RB_Tree_Node<ACE_UINT32,ACE_RMCast::Data>; +template class ACE_RMCast_Copy_On_Write<ACE_UINT32,ACE_RMCast::Data,ACE_RMCast_Retransmission::Collection,ACE_RMCast_Retransmission::Collection_Iterator>; +template class ACE_RMCast_Copy_On_Write_Write_Guard<ACE_RMCast_Retransmission::Collection,ACE_RMCast_Retransmission::Collection_Iterator>; +template class ACE_RMCast_Copy_On_Write_Read_Guard<ACE_RMCast_Retransmission::Collection,ACE_RMCast_Retransmission::Collection_Iterator>; +template class ACE_RMCast_Copy_On_Write_Collection<ACE_RMCast_Retransmission::Collection,ACE_RMCast_Retransmission::Collection_Iterator>; +template class ACE_RMCast_Worker<ACE_UINT32,ACE_RMCast::Data>; + #endif /* ACE_HAS_EXPLICIT_TEMPLATE_INSTANTIATION */ |