summaryrefslogtreecommitdiff
path: root/ace/RMCast/RMCast_Reassembly.cpp
diff options
context:
space:
mode:
Diffstat (limited to 'ace/RMCast/RMCast_Reassembly.cpp')
-rw-r--r--ace/RMCast/RMCast_Reassembly.cpp90
1 files changed, 46 insertions, 44 deletions
diff --git a/ace/RMCast/RMCast_Reassembly.cpp b/ace/RMCast/RMCast_Reassembly.cpp
index be56e6cd9e8..a52791e1ebf 100644
--- a/ace/RMCast/RMCast_Reassembly.cpp
+++ b/ace/RMCast/RMCast_Reassembly.cpp
@@ -1,34 +1,26 @@
// $Id$
-#ifndef ACE_RMCAST_REASSEMBLY_C
-#define ACE_RMCAST_REASSEMBLY_C
-
#include "RMCast_Reassembly.h"
+#include "RMCast_Partial_Message.h"
+#include "ace/Message_Block.h"
#if !defined (ACE_LACKS_PRAGMA_ONCE)
# pragma once
#endif /* ACE_LACKS_PRAGMA_ONCE */
-#include "RMCast_Partial_Message.h"
-
#if !defined (__ACE_INLINE__)
#include "RMCast_Reassembly.i"
#endif /* __ACE_INLINE__ */
ACE_RCSID(ace, RMCast_Reassembly, "$Id$")
-
-template <ACE_SYNCH_DECL>
-ACE_RMCast_Reassembly<ACE_SYNCH_USE>::
-ACE_RMCast_Reassembly (ACE_Thread_Manager *thr_mgr,
- ACE_Message_Queue<ACE_SYNCH_USE> *mq)
- : ACE_Task<ACE_SYNCH_USE> (thr_mgr, mq)
+ACE_RMCast_Reassembly::
+ACE_RMCast_Reassembly (void)
+ : ACE_RMCast_Module ()
{
}
-template <ACE_SYNCH_DECL>
-ACE_RMCast_Reassembly<ACE_SYNCH_USE>::
-~ACE_RMCast_Reassembly (void)
+ACE_RMCast_Reassembly::~ACE_RMCast_Reassembly (void)
{
for (Message_Map_Iterator i = this->messages_.begin ();
i != this->messages_.end ();
@@ -41,39 +33,30 @@ ACE_RMCast_Reassembly<ACE_SYNCH_USE>::
this->messages_.unbind_all ();
}
-template <ACE_SYNCH_DECL> int
-ACE_RMCast_Reassembly<ACE_SYNCH_USE>::put (ACE_Message_Block *mb,
- ACE_Time_Value *tv)
+int
+ACE_RMCast_Reassembly::put_data (ACE_RMCast::Data &data)
{
- ACE_UINT32 header[3];
- size_t fragment_header_size = sizeof(header);
-
- if (mb->length () < fragment_header_size)
- ACE_ERROR_RETURN ((LM_ERROR,
- "Message block too small, "
- "not enough room for the header\n"),
- -1);
-
- ACE_OS::memcpy (header, mb->rd_ptr (), fragment_header_size);
-
- ACE_UINT32 message_sequence_number = ACE_NTOHL(header[0]);
- ACE_UINT32 offset = ACE_NTOHL(header[1]);
- ACE_UINT32 message_size = ACE_NTOHL(header[2]);
+ if (this->next () == 0)
+ return 0;
- if (mb->length () + offset > message_size)
- return -1; // Corrupt message?
+ if (data.payload->length () + data.fragment_offset > data.total_size)
+ {
+ ACE_DEBUG ((LM_DEBUG,
+ "RMCast_Reassembly::put_data - invalid size\n"));
+ return -1; // Corrupt message?
+ }
ACE_RMCast_Partial_Message *message;
{
- ACE_GUARD_RETURN (ACE_SYNCH_MUTEX_T, ace_mon, this->mutex_, -1);
- if (this->messages_.find (message_sequence_number, message) == -1)
+ ACE_GUARD_RETURN (ACE_SYNCH_MUTEX, ace_mon, this->mutex_, -1);
+ if (this->messages_.find (data.sequence_number, message) == -1)
{
ACE_NEW_RETURN (message,
- ACE_RMCast_Partial_Message (message_size),
+ ACE_RMCast_Partial_Message (data.total_size),
-1);
- if (this->messages_.bind (message_sequence_number,
+ if (this->messages_.bind (data.sequence_number,
message) == -1)
return -1; // Internal error?
}
@@ -83,26 +66,45 @@ ACE_RMCast_Reassembly<ACE_SYNCH_USE>::put (ACE_Message_Block *mb,
if (message == 0)
return 0;
- if (message->fragment_received (message_size,
- offset,
- mb) == -1)
- return -1;
+ if (message->fragment_received (data.total_size,
+ data.fragment_offset,
+ data.payload) == -1)
+ {
+ ACE_DEBUG ((LM_DEBUG,
+ "Error in fragment_received\n"));
+ return -1;
+ }
if (!message->is_complete ())
return 0;
// Remove the message from the collection, but leave a marker
// to indicate that it was already received...
- if (this->messages_.rebind (message_sequence_number, 0) == -1)
+ if (this->messages_.rebind (data.sequence_number,
+ (ACE_RMCast_Partial_Message*)0) == -1)
return -1;
}
// Push the message...
- int r = this->put_next (message->message_body (), tv);
+ ACE_RMCast::Data downstream_data;
+ downstream_data.sequence_number = data.sequence_number;
+ downstream_data.total_size = message->message_body ()->length ();
+ downstream_data.fragment_offset = 0;
+ downstream_data.payload = message->message_body ();
+
+ int r = this->next ()->put_data (downstream_data);
delete message;
return r;
}
-#endif /* ACE_RMCAST_REASSEMBLY_C */
+#if defined (ACE_HAS_EXPLICIT_TEMPLATE_INSTANTIATION)
+
+template class ACE_Hash_Map_Manager<ACE_UINT32,ACE_RMCast_Partial_Message*,ACE_Null_Mutex>;
+template class ACE_Hash_Map_Manager_Ex<ACE_UINT32,ACE_RMCast_Partial_Message*,ACE_Hash<ACE_UINT32>,ACE_Equal_To<ACE_UINT32>,ACE_Null_Mutex>;
+template class ACE_Hash_Map_Iterator<ACE_UINT32,ACE_RMCast_Partial_Message*,ACE_Null_Mutex>;
+template class ACE_Hash_Map_Iterator_Base_Ex<ACE_UINT32,ACE_RMCast_Partial_Message*,ACE_Hash<ACE_UINT32>,ACE_Equal_To<ACE_UINT32>,ACE_Null_Mutex >;
+template class ACE_Hash_Map_Entry<ACE_UINT32,ACE_RMCast_Partial_Message*>;
+
+#endif /* ACE_HAS_EXPLICIT_TEMPLATE_INSTANTIATION */