summaryrefslogtreecommitdiff
path: root/ace/RMCast/RMCast_Retransmission.cpp
diff options
context:
space:
mode:
authorcoryan <coryan@ae88bc3d-4319-0410-8dbf-d08b4c9d3795>2000-10-02 18:40:06 +0000
committercoryan <coryan@ae88bc3d-4319-0410-8dbf-d08b4c9d3795>2000-10-02 18:40:06 +0000
commit3c7edb739b596313a3e15fe1373bd488c2f37009 (patch)
treeec3f3e0df41c8f76d9c1ae3d472221e78a23d61f /ace/RMCast/RMCast_Retransmission.cpp
parent6c8cdfe85f70b9be20d1dd80f9730dae491ff403 (diff)
downloadATCD-3c7edb739b596313a3e15fe1373bd488c2f37009.tar.gz
ChangeLogTag:Mon Oct 2 11:29:47 2000 Carlos O'Ryan <coryan@uci.edu>
Diffstat (limited to 'ace/RMCast/RMCast_Retransmission.cpp')
-rw-r--r--ace/RMCast/RMCast_Retransmission.cpp127
1 files changed, 105 insertions, 22 deletions
diff --git a/ace/RMCast/RMCast_Retransmission.cpp b/ace/RMCast/RMCast_Retransmission.cpp
index a996e1204d5..7e38cdf7c97 100644
--- a/ace/RMCast/RMCast_Retransmission.cpp
+++ b/ace/RMCast/RMCast_Retransmission.cpp
@@ -16,18 +16,60 @@ ACE_RMCast_Retransmission::~ACE_RMCast_Retransmission (void)
{
}
+class ACE_RMCast_Resend_Worker
+ : public ACE_RMCast_Worker<ACE_UINT32,ACE_RMCast::Data>
+{
+public:
+ ACE_RMCast_Resend_Worker (ACE_RMCast_Module *next,
+ ACE_UINT32 max_sequence_number)
+ : n (0)
+ , next_ (next)
+ , max_sequence_number_ (max_sequence_number)
+ {
+ }
+
+ int work (ACE_UINT32 const & key,
+ ACE_RMCast::Data const & item)
+ {
+ if (key > this->max_sequence_number_)
+ return 0;
+ ACE_DEBUG ((LM_DEBUG,
+ " Retransmission::resend - message %d resent\n",
+ key));
+ ACE_RMCast::Data data = item;
+ int r = this->next_->data (data);
+ if (r != 0)
+ return r;
+ n++;
+ return 0;
+ }
+
+ int n;
+
+private:
+ ACE_RMCast_Module *next_;
+
+ ACE_UINT32 max_sequence_number_;
+};
+
int
-ACE_RMCast_Retransmission::close (void)
+ACE_RMCast_Retransmission::resend (ACE_UINT32 max_sequence_number)
{
- Messages_Iterator end = this->messages_.end ();
+ if (this->next () == 0)
+ return 0;
- for (Messages_Iterator i = this->messages_.begin ();
- i != end;
- ++i)
- {
- ACE_Message_Block::release ((*i).item ().payload);
- }
- this->messages_.close ();
+ ACE_RMCast_Resend_Worker worker (this->next (), max_sequence_number);
+
+ if (this->messages_.for_each (&worker) == -1)
+ return -1;
+
+ return worker.n;
+}
+
+int
+ACE_RMCast_Retransmission::close (void)
+{
+ // @@
return 0;
}
@@ -40,7 +82,6 @@ ACE_RMCast_Retransmission::data (ACE_RMCast::Data &data)
int r = this->next ()->data (data);
if (r == 0)
{
- ACE_GUARD_RETURN (ACE_SYNCH_MUTEX, ace_mon, this->mutex_, -1);
ACE_RMCast::Data copy = data;
copy.payload = ACE_Message_Block::duplicate (data.payload);
r = this->messages_.bind (data.sequence_number, copy);
@@ -55,8 +96,9 @@ ACE_RMCast_Retransmission::join (ACE_RMCast::Join &join)
return 0;
ACE_RMCast::Ack_Join ack_join;
+#if 0
+ // @@
{
- ACE_GUARD_RETURN (ACE_SYNCH_MUTEX, ace_mon, this->mutex_, -1);
Messages_Iterator end = this->messages_.end ();
Messages_Iterator begin = this->messages_.begin ();
@@ -70,6 +112,7 @@ ACE_RMCast_Retransmission::join (ACE_RMCast::Join &join)
ack_join.next_sequence_number = (*begin).key ();
}
}
+#endif
(void) join.source->reply_ack_join (ack_join);
// @@ We should force a full retransmission of all the messages!
@@ -77,20 +120,54 @@ ACE_RMCast_Retransmission::join (ACE_RMCast::Join &join)
return 0;
}
+class ACE_RMCast_Ack_Worker
+ : public ACE_RMCast_Worker<ACE_UINT32,ACE_RMCast::Data>
+{
+public:
+ ACE_RMCast_Ack_Worker (ACE_RMCast::Ack &ack,
+ ACE_RMCast_Retransmission::Messages::Write_Guard &g,
+ ACE_RMCast_Retransmission::Messages *messages)
+ : ack_ (ack)
+ , ace_mon_ (g)
+ , messages_ (messages)
+ {
+ }
+
+ int work (ACE_UINT32 const & key,
+ ACE_RMCast::Data const &)
+ {
+ if (key >= this->ack_.next_expected)
+ return 0;
+ ACE_DEBUG ((LM_DEBUG,
+ " Retransmission::ack - message %d erased\n",
+ key));
+ return this->messages_->unbind_i (this->ace_mon_, key);
+ }
+
+private:
+ ACE_RMCast_Ack_Worker (const ACE_RMCast_Ack_Worker&);
+ ACE_RMCast_Ack_Worker& operator= (const ACE_RMCast_Ack_Worker&);
+
+private:
+ ACE_RMCast::Ack &ack_;
+
+ ACE_RMCast_Retransmission::Messages::Write_Guard &ace_mon_;
+
+ ACE_RMCast_Retransmission::Messages *messages_;
+};
+
int
ACE_RMCast_Retransmission::ack (ACE_RMCast::Ack &ack)
{
- ACE_GUARD_RETURN (ACE_SYNCH_MUTEX, ace_mon, this->mutex_, -1);
- for (Messages_Iterator i = this->messages_.begin ();
- i != this->messages_.end ();
- /* do nothing */)
- {
- ACE_UINT32 key = (*i).key ();
- if (key > ack.highest_in_sequence)
- break;
- this->messages_.unbind (key);
- }
- return 0;
+ Messages::Write_Guard ace_mon (this->messages_.mutex_,
+ this->messages_.cond_,
+ this->messages_.pending_writes_,
+ this->messages_.writing_,
+ this->messages_.collection_);
+
+ ACE_RMCast_Ack_Worker worker (ack, ace_mon, &this->messages_);
+
+ return this->messages_.for_each (&worker);
}
#if defined (ACE_HAS_EXPLICIT_TEMPLATE_INSTANTIATION)
@@ -101,4 +178,10 @@ template class ACE_RB_Tree_Iterator<ACE_UINT32,ACE_RMCast::Data,ACE_Less_Than<AC
template class ACE_RB_Tree_Reverse_Iterator<ACE_UINT32,ACE_RMCast::Data,ACE_Less_Than<ACE_UINT32>,ACE_Null_Mutex>;
template class ACE_RB_Tree_Node<ACE_UINT32,ACE_RMCast::Data>;
+template class ACE_RMCast_Copy_On_Write<ACE_UINT32,ACE_RMCast::Data,ACE_RMCast_Retransmission::Collection,ACE_RMCast_Retransmission::Collection_Iterator>;
+template class ACE_RMCast_Copy_On_Write_Write_Guard<ACE_RMCast_Retransmission::Collection,ACE_RMCast_Retransmission::Collection_Iterator>;
+template class ACE_RMCast_Copy_On_Write_Read_Guard<ACE_RMCast_Retransmission::Collection,ACE_RMCast_Retransmission::Collection_Iterator>;
+template class ACE_RMCast_Copy_On_Write_Collection<ACE_RMCast_Retransmission::Collection,ACE_RMCast_Retransmission::Collection_Iterator>;
+template class ACE_RMCast_Worker<ACE_UINT32,ACE_RMCast::Data>;
+
#endif /* ACE_HAS_EXPLICIT_TEMPLATE_INSTANTIATION */