summaryrefslogtreecommitdiff
path: root/protocols/ace/RMCast/RMCast_Reassembly.cpp
diff options
context:
space:
mode:
Diffstat (limited to 'protocols/ace/RMCast/RMCast_Reassembly.cpp')
-rw-r--r--protocols/ace/RMCast/RMCast_Reassembly.cpp90
1 files changed, 44 insertions, 46 deletions
diff --git a/protocols/ace/RMCast/RMCast_Reassembly.cpp b/protocols/ace/RMCast/RMCast_Reassembly.cpp
index ba2e9b79c1a..be56e6cd9e8 100644
--- a/protocols/ace/RMCast/RMCast_Reassembly.cpp
+++ b/protocols/ace/RMCast/RMCast_Reassembly.cpp
@@ -1,26 +1,34 @@
// $Id$
+#ifndef ACE_RMCAST_REASSEMBLY_C
+#define ACE_RMCAST_REASSEMBLY_C
+
#include "RMCast_Reassembly.h"
-#include "RMCast_Partial_Message.h"
-#include "ace/Message_Block.h"
#if !defined (ACE_LACKS_PRAGMA_ONCE)
# pragma once
#endif /* ACE_LACKS_PRAGMA_ONCE */
+#include "RMCast_Partial_Message.h"
+
#if !defined (__ACE_INLINE__)
#include "RMCast_Reassembly.i"
#endif /* __ACE_INLINE__ */
ACE_RCSID(ace, RMCast_Reassembly, "$Id$")
-ACE_RMCast_Reassembly::
-ACE_RMCast_Reassembly (void)
- : ACE_RMCast_Module ()
+
+template <ACE_SYNCH_DECL>
+ACE_RMCast_Reassembly<ACE_SYNCH_USE>::
+ACE_RMCast_Reassembly (ACE_Thread_Manager *thr_mgr,
+ ACE_Message_Queue<ACE_SYNCH_USE> *mq)
+ : ACE_Task<ACE_SYNCH_USE> (thr_mgr, mq)
{
}
-ACE_RMCast_Reassembly::~ACE_RMCast_Reassembly (void)
+template <ACE_SYNCH_DECL>
+ACE_RMCast_Reassembly<ACE_SYNCH_USE>::
+~ACE_RMCast_Reassembly (void)
{
for (Message_Map_Iterator i = this->messages_.begin ();
i != this->messages_.end ();
@@ -33,30 +41,39 @@ ACE_RMCast_Reassembly::~ACE_RMCast_Reassembly (void)
this->messages_.unbind_all ();
}
-int
-ACE_RMCast_Reassembly::data (ACE_RMCast::Data &data)
+template <ACE_SYNCH_DECL> int
+ACE_RMCast_Reassembly<ACE_SYNCH_USE>::put (ACE_Message_Block *mb,
+ ACE_Time_Value *tv)
{
- if (this->next () == 0)
- return 0;
+ ACE_UINT32 header[3];
+ size_t fragment_header_size = sizeof(header);
- if (data.payload->length () + data.fragment_offset > data.total_size)
- {
- ACE_DEBUG ((LM_DEBUG,
- "RMCast_Reassembly::data - invalid size\n"));
- return -1; // Corrupt message?
- }
+ if (mb->length () < fragment_header_size)
+ ACE_ERROR_RETURN ((LM_ERROR,
+ "Message block too small, "
+ "not enough room for the header\n"),
+ -1);
+
+ ACE_OS::memcpy (header, mb->rd_ptr (), fragment_header_size);
+
+ ACE_UINT32 message_sequence_number = ACE_NTOHL(header[0]);
+ ACE_UINT32 offset = ACE_NTOHL(header[1]);
+ ACE_UINT32 message_size = ACE_NTOHL(header[2]);
+
+ if (mb->length () + offset > message_size)
+ return -1; // Corrupt message?
ACE_RMCast_Partial_Message *message;
{
- ACE_GUARD_RETURN (ACE_SYNCH_MUTEX, ace_mon, this->mutex_, -1);
- if (this->messages_.find (data.sequence_number, message) == -1)
+ ACE_GUARD_RETURN (ACE_SYNCH_MUTEX_T, ace_mon, this->mutex_, -1);
+ if (this->messages_.find (message_sequence_number, message) == -1)
{
ACE_NEW_RETURN (message,
- ACE_RMCast_Partial_Message (data.total_size),
+ ACE_RMCast_Partial_Message (message_size),
-1);
- if (this->messages_.bind (data.sequence_number,
+ if (this->messages_.bind (message_sequence_number,
message) == -1)
return -1; // Internal error?
}
@@ -66,45 +83,26 @@ ACE_RMCast_Reassembly::data (ACE_RMCast::Data &data)
if (message == 0)
return 0;
- if (message->fragment_received (data.total_size,
- data.fragment_offset,
- data.payload) == -1)
- {
- ACE_DEBUG ((LM_DEBUG,
- "Error in fragment_received\n"));
- return -1;
- }
+ if (message->fragment_received (message_size,
+ offset,
+ mb) == -1)
+ return -1;
if (!message->is_complete ())
return 0;
// Remove the message from the collection, but leave a marker
// to indicate that it was already received...
- if (this->messages_.rebind (data.sequence_number,
- (ACE_RMCast_Partial_Message*)0) == -1)
+ if (this->messages_.rebind (message_sequence_number, 0) == -1)
return -1;
}
// Push the message...
- ACE_RMCast::Data downstream_data;
- downstream_data.sequence_number = data.sequence_number;
- downstream_data.total_size = message->message_body ()->length ();
- downstream_data.fragment_offset = 0;
- downstream_data.payload = message->message_body ();
-
- int r = this->next ()->data (downstream_data);
+ int r = this->put_next (message->message_body (), tv);
delete message;
return r;
}
-#if defined (ACE_HAS_EXPLICIT_TEMPLATE_INSTANTIATION)
-
-template class ACE_Hash_Map_Manager<ACE_UINT32,ACE_RMCast_Partial_Message*,ACE_Null_Mutex>;
-template class ACE_Hash_Map_Manager_Ex<ACE_UINT32,ACE_RMCast_Partial_Message*,ACE_Hash<ACE_UINT32>,ACE_Equal_To<ACE_UINT32>,ACE_Null_Mutex>;
-template class ACE_Hash_Map_Iterator<ACE_UINT32,ACE_RMCast_Partial_Message*,ACE_Null_Mutex>;
-template class ACE_Hash_Map_Iterator_Base_Ex<ACE_UINT32,ACE_RMCast_Partial_Message*,ACE_Hash<ACE_UINT32>,ACE_Equal_To<ACE_UINT32>,ACE_Null_Mutex >;
-template class ACE_Hash_Map_Entry<ACE_UINT32,ACE_RMCast_Partial_Message*>;
-
-#endif /* ACE_HAS_EXPLICIT_TEMPLATE_INSTANTIATION */
+#endif /* ACE_RMCAST_REASSEMBLY_C */