summaryrefslogtreecommitdiff
path: root/protocols
diff options
context:
space:
mode:
authorcoryan <coryan@ae88bc3d-4319-0410-8dbf-d08b4c9d3795>2000-10-04 19:28:28 +0000
committercoryan <coryan@ae88bc3d-4319-0410-8dbf-d08b4c9d3795>2000-10-04 19:28:28 +0000
commitcd2323da3c06813b88c78946e599933f023adea2 (patch)
tree1688c8f56c860ce8ab7e223e943c401981c8ca62 /protocols
parent695ca6adf21fb6fb6ddc732822eb326747df59fe (diff)
downloadATCD-cd2323da3c06813b88c78946e599933f023adea2.tar.gz
ChangeLogTag:Wed Oct 04 12:23:34 2000 Carlos O'Ryan <coryan@uci.edu>
Diffstat (limited to 'protocols')
-rw-r--r--protocols/ace/RMCast/RMCast_Copy_On_Write.cpp17
-rw-r--r--protocols/ace/RMCast/RMCast_Copy_On_Write.h56
-rw-r--r--protocols/ace/RMCast/RMCast_Copy_On_Write.i7
-rw-r--r--protocols/ace/RMCast/RMCast_Proxy.cpp11
-rw-r--r--protocols/ace/RMCast/RMCast_Proxy.h4
-rw-r--r--protocols/ace/RMCast/RMCast_Reordering.cpp16
-rw-r--r--protocols/ace/RMCast/RMCast_Retransmission.cpp23
-rw-r--r--protocols/ace/RMCast/RMCast_UDP_Proxy.cpp5
8 files changed, 82 insertions, 57 deletions
diff --git a/protocols/ace/RMCast/RMCast_Copy_On_Write.cpp b/protocols/ace/RMCast/RMCast_Copy_On_Write.cpp
index 20a76e559e0..eeb3b9422c2 100644
--- a/protocols/ace/RMCast/RMCast_Copy_On_Write.cpp
+++ b/protocols/ace/RMCast/RMCast_Copy_On_Write.cpp
@@ -60,7 +60,7 @@ template<class KEY, class ITEM, class COLLECTION, class ITERATOR> int
ACE_RMCast_Copy_On_Write<KEY,ITEM,COLLECTION,ITERATOR>::
for_each (ACE_RMCast_Worker<KEY,ITEM> *worker)
{
- Read_Guard ace_mon (this->mutex_, this->collection_);
+ Read_Guard ace_mon (*this);
ITERATOR end = ace_mon.collection->collection.end ();
for (ITERATOR i = ace_mon.collection->collection.begin (); i != end; ++i)
@@ -74,6 +74,19 @@ ACE_RMCast_Copy_On_Write<KEY,ITEM,COLLECTION,ITERATOR>::
return 0;
}
+template<class KEY, class ITEM, class C, class ITERATOR> KEY
+ACE_RMCast_Copy_On_Write<KEY,ITEM,C,ITERATOR>::first_key (void)
+{
+ Read_Guard ace_mon (*this);
+ ITERATOR end = ace_mon.collection->collection.end ();
+ ITERATOR begin = ace_mon.collection->collection.begin ();
+ if (begin == end)
+ {
+ return KEY ();
+ }
+ return (*begin).key ();
+}
+
template<class KEY, class ITEM, class C, class I> int
ACE_RMCast_Copy_On_Write<KEY,ITEM,C,I>::bind (KEY const & k,
ITEM const & i)
@@ -121,7 +134,7 @@ ACE_RMCast_Copy_On_Write_Container<COLLECTION,ITERATOR>::ACE_RMCast_Copy_On_Writ
template<class COLLECTION, class ITERATOR>
ACE_RMCast_Copy_On_Write_Write_Guard<COLLECTION,ITERATOR>::
- ACE_RMCast_Copy_On_Write_Write_Guard (ACE_RMCast_Copy_On_Write_Container<COLLECTION,ITERATOR> &container)
+ ACE_RMCast_Copy_On_Write_Write_Guard (Container &container)
: copy (0)
, mutex (container.mutex_)
, cond (container.cond_)
diff --git a/protocols/ace/RMCast/RMCast_Copy_On_Write.h b/protocols/ace/RMCast/RMCast_Copy_On_Write.h
index e06aa5f1689..d3513a05d2d 100644
--- a/protocols/ace/RMCast/RMCast_Copy_On_Write.h
+++ b/protocols/ace/RMCast/RMCast_Copy_On_Write.h
@@ -33,29 +33,8 @@ private:
// ****************************************************************
-//! Implement a read guard for a reference counted collection
template<class COLLECTION, class ITERATOR>
-class ACE_RMCast_Copy_On_Write_Read_Guard
-{
-public:
- typedef ACE_RMCast_Copy_On_Write_Collection<COLLECTION,ITERATOR> Collection;
-
- //! Constructor
- ACE_RMCast_Copy_On_Write_Read_Guard (ACE_SYNCH_MUTEX &mutex,
- Collection *&collection);
-
- //! Destructor
- ~ACE_RMCast_Copy_On_Write_Read_Guard (void);
-
- //! A reference to the collection
- Collection *collection;
-
-private:
- //! Synchronization
- ACE_SYNCH_MUTEX &mutex_;
-};
-
-// ****************************************************************
+class ACE_RMCast_Copy_On_Write_Read_Guard;
template<class COLLECTION, class ITERATOR>
class ACE_RMCast_Copy_On_Write_Write_Guard;
@@ -72,6 +51,9 @@ public:
//! Let the Write_Guard access the internal fields.
friend ACE_RMCast_Copy_On_Write_Write_Guard<COLLECTION,ITERATOR>;
+ //! Let the Read_Guard access the internal fields.
+ friend ACE_RMCast_Copy_On_Write_Read_Guard<COLLECTION,ITERATOR>;
+
//! A shorter name for the actual collection type
typedef ACE_RMCast_Copy_On_Write_Collection<COLLECTION,ITERATOR> Collection;
@@ -98,6 +80,30 @@ protected:
// ****************************************************************
+//! Implement a read guard for a reference counted collection
+template<class COLLECTION, class ITERATOR>
+class ACE_RMCast_Copy_On_Write_Read_Guard
+{
+public:
+ typedef ACE_RMCast_Copy_On_Write_Collection<COLLECTION,ITERATOR> Collection;
+ typedef ACE_RMCast_Copy_On_Write_Container<COLLECTION,ITERATOR> Container;
+
+ //! Constructor
+ ACE_RMCast_Copy_On_Write_Read_Guard (Container &container);
+
+ //! Destructor
+ ~ACE_RMCast_Copy_On_Write_Read_Guard (void);
+
+ //! A reference to the collection
+ Collection *collection;
+
+private:
+ //! Synchronization
+ ACE_SYNCH_MUTEX &mutex_;
+};
+
+// ****************************************************************
+
//! Implement the write guard for a reference counted collecion
/*!
* This helper class atomically increments the reference count of a
@@ -109,9 +115,10 @@ class ACE_RMCast_Copy_On_Write_Write_Guard
{
public:
typedef ACE_RMCast_Copy_On_Write_Collection<COLLECTION,ITERATOR> Collection;
+ typedef ACE_RMCast_Copy_On_Write_Container<COLLECTION,ITERATOR> Container;
//! Constructor
- ACE_RMCast_Copy_On_Write_Write_Guard (ACE_RMCast_Copy_On_Write_Container<COLLECTION,ITERATOR> &container);
+ ACE_RMCast_Copy_On_Write_Write_Guard (Container &container);
//! Destructor
~ACE_RMCast_Copy_On_Write_Write_Guard (void);
@@ -171,6 +178,9 @@ public:
//! Iterate over all the elements invoking \param worker on each one.
int for_each (ACE_RMCast_Worker<KEY,ITEM> *worker);
+ //! Get the first key
+ KEY first_key (void);
+
//! Add a new element
int bind (KEY const & key, ITEM const & item);
diff --git a/protocols/ace/RMCast/RMCast_Copy_On_Write.i b/protocols/ace/RMCast/RMCast_Copy_On_Write.i
index 354dd51bf5a..3c069c84eb6 100644
--- a/protocols/ace/RMCast/RMCast_Copy_On_Write.i
+++ b/protocols/ace/RMCast/RMCast_Copy_On_Write.i
@@ -11,13 +11,12 @@ ACE_RMCast_Copy_On_Write_Collection<COLLECTION,ITERATOR>::
template<class COLLECTION, class ITERATOR> ACE_INLINE
ACE_RMCast_Copy_On_Write_Read_Guard<COLLECTION,ITERATOR>::
- ACE_RMCast_Copy_On_Write_Read_Guard (ACE_SYNCH_MUTEX &m,
- Collection*& collection_ref)
+ ACE_RMCast_Copy_On_Write_Read_Guard (Container &container)
: collection (0)
- , mutex_ (m)
+ , mutex_ (container.mutex_)
{
ACE_GUARD (ACE_SYNCH_MUTEX, ace_mon, this->mutex_);
- this->collection = collection_ref;
+ this->collection = container.collection_;
this->collection->_incr_refcnt ();
}
diff --git a/protocols/ace/RMCast/RMCast_Proxy.cpp b/protocols/ace/RMCast/RMCast_Proxy.cpp
index f6b2bbec5e5..2257f9a2bd7 100644
--- a/protocols/ace/RMCast/RMCast_Proxy.cpp
+++ b/protocols/ace/RMCast/RMCast_Proxy.cpp
@@ -33,3 +33,14 @@ ACE_RMCast_Proxy::ack (ACE_RMCast::Ack &ack)
this->highest_received_ = ack.highest_received;
return this->ACE_RMCast_Module::ack (ack);
}
+
+int
+ACE_RMCast_Proxy::reply_ack_join (ACE_RMCast::Ack_Join &ack_join)
+{
+ if (this->next_expected_ < ack_join.next_sequence_number)
+ {
+ this->next_expected_ = ack_join.next_sequence_number;
+ this->highest_received_ = ack_join.next_sequence_number;
+ }
+ return 0;
+}
diff --git a/protocols/ace/RMCast/RMCast_Proxy.h b/protocols/ace/RMCast/RMCast_Proxy.h
index e0e6afe79b1..d774efe4b2c 100644
--- a/protocols/ace/RMCast/RMCast_Proxy.h
+++ b/protocols/ace/RMCast/RMCast_Proxy.h
@@ -48,7 +48,7 @@ public:
//! Destructor
virtual ~ACE_RMCast_Proxy (void);
-
+
//! Return the next sequence number expected by the peer. Only
//! applies to remote receiver proxies.
/*!
@@ -72,7 +72,7 @@ public:
*/
virtual int reply_data (ACE_RMCast::Data &) = 0;
virtual int reply_poll (ACE_RMCast::Poll &) = 0;
- virtual int reply_ack_join (ACE_RMCast::Ack_Join &) = 0;
+ virtual int reply_ack_join (ACE_RMCast::Ack_Join &);
virtual int reply_ack_leave (ACE_RMCast::Ack_Leave &) = 0;
virtual int reply_ack (ACE_RMCast::Ack &) = 0;
virtual int reply_join (ACE_RMCast::Join &) = 0;
diff --git a/protocols/ace/RMCast/RMCast_Reordering.cpp b/protocols/ace/RMCast/RMCast_Reordering.cpp
index f63c6c66f34..a984fa9b989 100644
--- a/protocols/ace/RMCast/RMCast_Reordering.cpp
+++ b/protocols/ace/RMCast/RMCast_Reordering.cpp
@@ -52,14 +52,14 @@ ACE_RMCast_Reordering::data (ACE_RMCast::Data &data)
// is "old".
this->next_expected_++;
-
+
// Right message, process as many messages as possible from
// the queue, then ack the right level...
// NOTE: we cannot release the mutex while dispatching
// events, otherwise: how do we stop other threads from
// delivering messages out of order? I.E. what if the
- // next thread receives the next message?
+ // next thread receives the next message?
if (this->next () != 0)
(void) this->next ()->data (data);
@@ -83,7 +83,9 @@ ACE_RMCast_Reordering::data (ACE_RMCast::Data &data)
{
this->highest_received_ = data.sequence_number;
}
- (void) this->messages_.bind (data.sequence_number, data);
+ ACE_RMCast::Data new_data = data;
+ new_data.payload = ACE_Message_Block::duplicate (data.payload);
+ (void) this->messages_.bind (data.sequence_number, new_data);
// re-ack, otherwise save it and ack.
}
@@ -109,10 +111,11 @@ ACE_RMCast_Reordering::ack_join (ACE_RMCast::Ack_Join &ack_join)
Messages_Iterator i = this->messages_.begin ();
Messages_Iterator end = this->messages_.end ();
-
+
while (i != end
&& (*i).key () < ack_join.next_sequence_number)
{
+ ACE_Message_Block::release ((*i).item ().payload);
this->messages_.unbind ((*i).key ());
i = this->messages_.begin ();
}
@@ -123,7 +126,7 @@ ACE_RMCast_Reordering::ack_join (ACE_RMCast::Ack_Join &ack_join)
this->push_queued_messages ();
}
-
+
return 0;
}
@@ -132,7 +135,7 @@ ACE_RMCast_Reordering::push_queued_messages (void)
{
Messages_Iterator i = this->messages_.begin ();
Messages_Iterator end = this->messages_.end ();
-
+
while (i != end
&& (*i).key () == this->next_expected_)
{
@@ -142,6 +145,7 @@ ACE_RMCast_Reordering::push_queued_messages (void)
this->next ()->data (data);
}
+ ACE_Message_Block::release ((*i).item ().payload);
this->messages_.unbind ((*i).key ());
i = this->messages_.begin ();
this->next_expected_++;
diff --git a/protocols/ace/RMCast/RMCast_Retransmission.cpp b/protocols/ace/RMCast/RMCast_Retransmission.cpp
index 4d40a59ac5e..c9db70cbf63 100644
--- a/protocols/ace/RMCast/RMCast_Retransmission.cpp
+++ b/protocols/ace/RMCast/RMCast_Retransmission.cpp
@@ -96,23 +96,9 @@ ACE_RMCast_Retransmission::join (ACE_RMCast::Join &join)
return 0;
ACE_RMCast::Ack_Join ack_join;
-#if 0
- // @@
- {
- Messages_Iterator end = this->messages_.end ();
- Messages_Iterator begin = this->messages_.begin ();
-
- ack_join.source = 0;
- if (begin == end)
- {
- ack_join.next_sequence_number = 0;
- }
- else
- {
- ack_join.next_sequence_number = (*begin).key ();
- }
- }
-#endif
+ ack_join.source = 0;
+ ack_join.next_sequence_number = this->messages_.first_key ();
+
(void) join.source->reply_ack_join (ack_join);
// @@ We should force a full retransmission of all the messages!
@@ -134,13 +120,14 @@ public:
}
int work (ACE_UINT32 const & key,
- ACE_RMCast::Data const &)
+ ACE_RMCast::Data const &item)
{
if (key >= this->ack_.next_expected)
return 0;
// ACE_DEBUG ((LM_DEBUG,
// " Retransmission::ack - message %d erased\n",
// key));
+ ACE_Message_Block::release (item.payload);
return this->messages_->unbind_i (this->ace_mon_, key);
}
diff --git a/protocols/ace/RMCast/RMCast_UDP_Proxy.cpp b/protocols/ace/RMCast/RMCast_UDP_Proxy.cpp
index 2eb0983b171..fbb37eca2d6 100644
--- a/protocols/ace/RMCast/RMCast_UDP_Proxy.cpp
+++ b/protocols/ace/RMCast/RMCast_UDP_Proxy.cpp
@@ -147,9 +147,10 @@ ACE_RMCast_UDP_Proxy::reply_poll (ACE_RMCast::Poll &poll)
}
int
-ACE_RMCast_UDP_Proxy::reply_ack_join (ACE_RMCast::Ack_Join &ack)
+ACE_RMCast_UDP_Proxy::reply_ack_join (ACE_RMCast::Ack_Join &ack_join)
{
- return this->io_udp_->send_ack_join (ack, this->peer_addr_);
+ (void) this->ACE_RMCast_Proxy::reply_ack_join (ack_join);
+ return this->io_udp_->send_ack_join (ack_join, this->peer_addr_);
}
int