diff options
Diffstat (limited to 'protocols/ace/RMCast/RMCast_Reassembly.cpp')
-rw-r--r-- | protocols/ace/RMCast/RMCast_Reassembly.cpp | 90 |
1 files changed, 44 insertions, 46 deletions
diff --git a/protocols/ace/RMCast/RMCast_Reassembly.cpp b/protocols/ace/RMCast/RMCast_Reassembly.cpp index ba2e9b79c1a..be56e6cd9e8 100644 --- a/protocols/ace/RMCast/RMCast_Reassembly.cpp +++ b/protocols/ace/RMCast/RMCast_Reassembly.cpp @@ -1,26 +1,34 @@ // $Id$ +#ifndef ACE_RMCAST_REASSEMBLY_C +#define ACE_RMCAST_REASSEMBLY_C + #include "RMCast_Reassembly.h" -#include "RMCast_Partial_Message.h" -#include "ace/Message_Block.h" #if !defined (ACE_LACKS_PRAGMA_ONCE) # pragma once #endif /* ACE_LACKS_PRAGMA_ONCE */ +#include "RMCast_Partial_Message.h" + #if !defined (__ACE_INLINE__) #include "RMCast_Reassembly.i" #endif /* __ACE_INLINE__ */ ACE_RCSID(ace, RMCast_Reassembly, "$Id$") -ACE_RMCast_Reassembly:: -ACE_RMCast_Reassembly (void) - : ACE_RMCast_Module () + +template <ACE_SYNCH_DECL> +ACE_RMCast_Reassembly<ACE_SYNCH_USE>:: +ACE_RMCast_Reassembly (ACE_Thread_Manager *thr_mgr, + ACE_Message_Queue<ACE_SYNCH_USE> *mq) + : ACE_Task<ACE_SYNCH_USE> (thr_mgr, mq) { } -ACE_RMCast_Reassembly::~ACE_RMCast_Reassembly (void) +template <ACE_SYNCH_DECL> +ACE_RMCast_Reassembly<ACE_SYNCH_USE>:: +~ACE_RMCast_Reassembly (void) { for (Message_Map_Iterator i = this->messages_.begin (); i != this->messages_.end (); @@ -33,30 +41,39 @@ ACE_RMCast_Reassembly::~ACE_RMCast_Reassembly (void) this->messages_.unbind_all (); } -int -ACE_RMCast_Reassembly::data (ACE_RMCast::Data &data) +template <ACE_SYNCH_DECL> int +ACE_RMCast_Reassembly<ACE_SYNCH_USE>::put (ACE_Message_Block *mb, + ACE_Time_Value *tv) { - if (this->next () == 0) - return 0; + ACE_UINT32 header[3]; + size_t fragment_header_size = sizeof(header); - if (data.payload->length () + data.fragment_offset > data.total_size) - { - ACE_DEBUG ((LM_DEBUG, - "RMCast_Reassembly::data - invalid size\n")); - return -1; // Corrupt message? - } + if (mb->length () < fragment_header_size) + ACE_ERROR_RETURN ((LM_ERROR, + "Message block too small, " + "not enough room for the header\n"), + -1); + + ACE_OS::memcpy (header, mb->rd_ptr (), fragment_header_size); + + ACE_UINT32 message_sequence_number = ACE_NTOHL(header[0]); + ACE_UINT32 offset = ACE_NTOHL(header[1]); + ACE_UINT32 message_size = ACE_NTOHL(header[2]); + + if (mb->length () + offset > message_size) + return -1; // Corrupt message? ACE_RMCast_Partial_Message *message; { - ACE_GUARD_RETURN (ACE_SYNCH_MUTEX, ace_mon, this->mutex_, -1); - if (this->messages_.find (data.sequence_number, message) == -1) + ACE_GUARD_RETURN (ACE_SYNCH_MUTEX_T, ace_mon, this->mutex_, -1); + if (this->messages_.find (message_sequence_number, message) == -1) { ACE_NEW_RETURN (message, - ACE_RMCast_Partial_Message (data.total_size), + ACE_RMCast_Partial_Message (message_size), -1); - if (this->messages_.bind (data.sequence_number, + if (this->messages_.bind (message_sequence_number, message) == -1) return -1; // Internal error? } @@ -66,45 +83,26 @@ ACE_RMCast_Reassembly::data (ACE_RMCast::Data &data) if (message == 0) return 0; - if (message->fragment_received (data.total_size, - data.fragment_offset, - data.payload) == -1) - { - ACE_DEBUG ((LM_DEBUG, - "Error in fragment_received\n")); - return -1; - } + if (message->fragment_received (message_size, + offset, + mb) == -1) + return -1; if (!message->is_complete ()) return 0; // Remove the message from the collection, but leave a marker // to indicate that it was already received... - if (this->messages_.rebind (data.sequence_number, - (ACE_RMCast_Partial_Message*)0) == -1) + if (this->messages_.rebind (message_sequence_number, 0) == -1) return -1; } // Push the message... - ACE_RMCast::Data downstream_data; - downstream_data.sequence_number = data.sequence_number; - downstream_data.total_size = message->message_body ()->length (); - downstream_data.fragment_offset = 0; - downstream_data.payload = message->message_body (); - - int r = this->next ()->data (downstream_data); + int r = this->put_next (message->message_body (), tv); delete message; return r; } -#if defined (ACE_HAS_EXPLICIT_TEMPLATE_INSTANTIATION) - -template class ACE_Hash_Map_Manager<ACE_UINT32,ACE_RMCast_Partial_Message*,ACE_Null_Mutex>; -template class ACE_Hash_Map_Manager_Ex<ACE_UINT32,ACE_RMCast_Partial_Message*,ACE_Hash<ACE_UINT32>,ACE_Equal_To<ACE_UINT32>,ACE_Null_Mutex>; -template class ACE_Hash_Map_Iterator<ACE_UINT32,ACE_RMCast_Partial_Message*,ACE_Null_Mutex>; -template class ACE_Hash_Map_Iterator_Base_Ex<ACE_UINT32,ACE_RMCast_Partial_Message*,ACE_Hash<ACE_UINT32>,ACE_Equal_To<ACE_UINT32>,ACE_Null_Mutex >; -template class ACE_Hash_Map_Entry<ACE_UINT32,ACE_RMCast_Partial_Message*>; - -#endif /* ACE_HAS_EXPLICIT_TEMPLATE_INSTANTIATION */ +#endif /* ACE_RMCAST_REASSEMBLY_C */ |