diff options
Diffstat (limited to 'ace/RMCast/RMCast_Reassembly.cpp')
-rw-r--r-- | ace/RMCast/RMCast_Reassembly.cpp | 90 |
1 files changed, 46 insertions, 44 deletions
diff --git a/ace/RMCast/RMCast_Reassembly.cpp b/ace/RMCast/RMCast_Reassembly.cpp index be56e6cd9e8..a52791e1ebf 100644 --- a/ace/RMCast/RMCast_Reassembly.cpp +++ b/ace/RMCast/RMCast_Reassembly.cpp @@ -1,34 +1,26 @@ // $Id$ -#ifndef ACE_RMCAST_REASSEMBLY_C -#define ACE_RMCAST_REASSEMBLY_C - #include "RMCast_Reassembly.h" +#include "RMCast_Partial_Message.h" +#include "ace/Message_Block.h" #if !defined (ACE_LACKS_PRAGMA_ONCE) # pragma once #endif /* ACE_LACKS_PRAGMA_ONCE */ -#include "RMCast_Partial_Message.h" - #if !defined (__ACE_INLINE__) #include "RMCast_Reassembly.i" #endif /* __ACE_INLINE__ */ ACE_RCSID(ace, RMCast_Reassembly, "$Id$") - -template <ACE_SYNCH_DECL> -ACE_RMCast_Reassembly<ACE_SYNCH_USE>:: -ACE_RMCast_Reassembly (ACE_Thread_Manager *thr_mgr, - ACE_Message_Queue<ACE_SYNCH_USE> *mq) - : ACE_Task<ACE_SYNCH_USE> (thr_mgr, mq) +ACE_RMCast_Reassembly:: +ACE_RMCast_Reassembly (void) + : ACE_RMCast_Module () { } -template <ACE_SYNCH_DECL> -ACE_RMCast_Reassembly<ACE_SYNCH_USE>:: -~ACE_RMCast_Reassembly (void) +ACE_RMCast_Reassembly::~ACE_RMCast_Reassembly (void) { for (Message_Map_Iterator i = this->messages_.begin (); i != this->messages_.end (); @@ -41,39 +33,30 @@ ACE_RMCast_Reassembly<ACE_SYNCH_USE>:: this->messages_.unbind_all (); } -template <ACE_SYNCH_DECL> int -ACE_RMCast_Reassembly<ACE_SYNCH_USE>::put (ACE_Message_Block *mb, - ACE_Time_Value *tv) +int +ACE_RMCast_Reassembly::put_data (ACE_RMCast::Data &data) { - ACE_UINT32 header[3]; - size_t fragment_header_size = sizeof(header); - - if (mb->length () < fragment_header_size) - ACE_ERROR_RETURN ((LM_ERROR, - "Message block too small, " - "not enough room for the header\n"), - -1); - - ACE_OS::memcpy (header, mb->rd_ptr (), fragment_header_size); - - ACE_UINT32 message_sequence_number = ACE_NTOHL(header[0]); - ACE_UINT32 offset = ACE_NTOHL(header[1]); - ACE_UINT32 message_size = ACE_NTOHL(header[2]); + if (this->next () == 0) + return 0; - if (mb->length () + offset > message_size) - return -1; // Corrupt message? + if (data.payload->length () + data.fragment_offset > data.total_size) + { + ACE_DEBUG ((LM_DEBUG, + "RMCast_Reassembly::put_data - invalid size\n")); + return -1; // Corrupt message? + } ACE_RMCast_Partial_Message *message; { - ACE_GUARD_RETURN (ACE_SYNCH_MUTEX_T, ace_mon, this->mutex_, -1); - if (this->messages_.find (message_sequence_number, message) == -1) + ACE_GUARD_RETURN (ACE_SYNCH_MUTEX, ace_mon, this->mutex_, -1); + if (this->messages_.find (data.sequence_number, message) == -1) { ACE_NEW_RETURN (message, - ACE_RMCast_Partial_Message (message_size), + ACE_RMCast_Partial_Message (data.total_size), -1); - if (this->messages_.bind (message_sequence_number, + if (this->messages_.bind (data.sequence_number, message) == -1) return -1; // Internal error? } @@ -83,26 +66,45 @@ ACE_RMCast_Reassembly<ACE_SYNCH_USE>::put (ACE_Message_Block *mb, if (message == 0) return 0; - if (message->fragment_received (message_size, - offset, - mb) == -1) - return -1; + if (message->fragment_received (data.total_size, + data.fragment_offset, + data.payload) == -1) + { + ACE_DEBUG ((LM_DEBUG, + "Error in fragment_received\n")); + return -1; + } if (!message->is_complete ()) return 0; // Remove the message from the collection, but leave a marker // to indicate that it was already received... - if (this->messages_.rebind (message_sequence_number, 0) == -1) + if (this->messages_.rebind (data.sequence_number, + (ACE_RMCast_Partial_Message*)0) == -1) return -1; } // Push the message... - int r = this->put_next (message->message_body (), tv); + ACE_RMCast::Data downstream_data; + downstream_data.sequence_number = data.sequence_number; + downstream_data.total_size = message->message_body ()->length (); + downstream_data.fragment_offset = 0; + downstream_data.payload = message->message_body (); + + int r = this->next ()->put_data (downstream_data); delete message; return r; } -#endif /* ACE_RMCAST_REASSEMBLY_C */ +#if defined (ACE_HAS_EXPLICIT_TEMPLATE_INSTANTIATION) + +template class ACE_Hash_Map_Manager<ACE_UINT32,ACE_RMCast_Partial_Message*,ACE_Null_Mutex>; +template class ACE_Hash_Map_Manager_Ex<ACE_UINT32,ACE_RMCast_Partial_Message*,ACE_Hash<ACE_UINT32>,ACE_Equal_To<ACE_UINT32>,ACE_Null_Mutex>; +template class ACE_Hash_Map_Iterator<ACE_UINT32,ACE_RMCast_Partial_Message*,ACE_Null_Mutex>; +template class ACE_Hash_Map_Iterator_Base_Ex<ACE_UINT32,ACE_RMCast_Partial_Message*,ACE_Hash<ACE_UINT32>,ACE_Equal_To<ACE_UINT32>,ACE_Null_Mutex >; +template class ACE_Hash_Map_Entry<ACE_UINT32,ACE_RMCast_Partial_Message*>; + +#endif /* ACE_HAS_EXPLICIT_TEMPLATE_INSTANTIATION */ |