diff options
author | coryan <coryan@ae88bc3d-4319-0410-8dbf-d08b4c9d3795> | 2000-06-17 00:39:09 +0000 |
---|---|---|
committer | coryan <coryan@ae88bc3d-4319-0410-8dbf-d08b4c9d3795> | 2000-06-17 00:39:09 +0000 |
commit | 9858d5181a243af42a9a15048e0bf7db4928f94c (patch) | |
tree | 8c598bbe84a843f5eb9b5fd895ba1acafa128f47 /protocols/ace/RMCast | |
parent | 6107fcba7504cca59cc0b9d85286b73507e3897e (diff) | |
download | ATCD-9858d5181a243af42a9a15048e0bf7db4928f94c.tar.gz |
ChangeLogTag:Fri Jun 16 17:30:18 2000 Carlos O'Ryan <coryan@uci.edu>
Diffstat (limited to 'protocols/ace/RMCast')
-rw-r--r-- | protocols/ace/RMCast/Makefile | 40 | ||||
-rw-r--r-- | protocols/ace/RMCast/RMCast.dsp | 151 | ||||
-rw-r--r-- | protocols/ace/RMCast/RMCast.dsw | 29 | ||||
-rw-r--r-- | protocols/ace/RMCast/RMCast_Export.h | 40 | ||||
-rw-r--r-- | protocols/ace/RMCast/RMCast_Fragment.cpp | 204 | ||||
-rw-r--r-- | protocols/ace/RMCast/RMCast_Fragment.h | 68 | ||||
-rw-r--r-- | protocols/ace/RMCast/RMCast_Fragment.i | 8 | ||||
-rw-r--r-- | protocols/ace/RMCast/RMCast_Partial_Message.cpp | 180 | ||||
-rw-r--r-- | protocols/ace/RMCast/RMCast_Partial_Message.h | 74 | ||||
-rw-r--r-- | protocols/ace/RMCast/RMCast_Partial_Message.i | 15 | ||||
-rw-r--r-- | protocols/ace/RMCast/RMCast_Reassembly.cpp | 108 | ||||
-rw-r--r-- | protocols/ace/RMCast/RMCast_Reassembly.h | 67 | ||||
-rw-r--r-- | protocols/ace/RMCast/RMCast_Reassembly.i | 2 |
13 files changed, 986 insertions, 0 deletions
diff --git a/protocols/ace/RMCast/Makefile b/protocols/ace/RMCast/Makefile new file mode 100644 index 00000000000..87d640afe74 --- /dev/null +++ b/protocols/ace/RMCast/Makefile @@ -0,0 +1,40 @@ +#---------------------------------------------------------------------------- +# +# $Id$ +# +#---------------------------------------------------------------------------- + +MAKEFILE = Makefile +LIB = libACE_RMCast.a +SHLIB = libACE_RMCast.$(SOEXT) + +FILES= \ + RMCast_Partial_Message +TEMPLATE_FILES = \ + RMCast_Fragment \ + RMCast_Reassembly + +#---------------------------------------------------------------------------- +# Include macros and targets +#---------------------------------------------------------------------------- + +include $(ACE_ROOT)/include/makeinclude/wrapper_macros.GNU + +LSRC = $(addsuffix .cpp,$(FILES)) + +include $(ACE_ROOT)/include/makeinclude/macros.GNU +include $(ACE_ROOT)/include/makeinclude/rules.common.GNU +include $(ACE_ROOT)/include/makeinclude/rules.nonested.GNU +include $(ACE_ROOT)/include/makeinclude/rules.lib.GNU +include $(ACE_ROOT)/include/makeinclude/rules.local.GNU + +#---------------------------------------------------------------------------- +# Local targets +#---------------------------------------------------------------------------- + +#---------------------------------------------------------------------------- +# Dependencies +#---------------------------------------------------------------------------- +# DO NOT DELETE THIS LINE -- g++dep uses it. +# DO NOT PUT ANYTHING AFTER THIS LINE, IT WILL GO AWAY. +# IF YOU PUT ANYTHING HERE IT WILL GO AWAY diff --git a/protocols/ace/RMCast/RMCast.dsp b/protocols/ace/RMCast/RMCast.dsp new file mode 100644 index 00000000000..53fdbf5cb9c --- /dev/null +++ b/protocols/ace/RMCast/RMCast.dsp @@ -0,0 +1,151 @@ +# Microsoft Developer Studio Project File - Name="RMCast" - Package Owner=<4>
+# Microsoft Developer Studio Generated Build File, Format Version 6.00
+# ** DO NOT EDIT **
+
+# TARGTYPE "Win32 (x86) Dynamic-Link Library" 0x0102
+
+CFG=RMCast - Win32 Debug
+!MESSAGE This is not a valid makefile. To build this project using NMAKE,
+!MESSAGE use the Export Makefile command and run
+!MESSAGE
+!MESSAGE NMAKE /f "RMCast.mak".
+!MESSAGE
+!MESSAGE You can specify a configuration when running NMAKE
+!MESSAGE by defining the macro CFG on the command line. For example:
+!MESSAGE
+!MESSAGE NMAKE /f "RMCast.mak" CFG="RMCast - Win32 Debug"
+!MESSAGE
+!MESSAGE Possible choices for configuration are:
+!MESSAGE
+!MESSAGE "RMCast - Win32 Release" (based on "Win32 (x86) Dynamic-Link Library")
+!MESSAGE "RMCast - Win32 Debug" (based on "Win32 (x86) Dynamic-Link Library")
+!MESSAGE
+
+# Begin Project
+# PROP AllowPerConfigDependencies 0
+# PROP Scc_ProjName ""
+# PROP Scc_LocalPath ""
+CPP=cl.exe
+MTL=midl.exe
+RSC=rc.exe
+
+!IF "$(CFG)" == "RMCast - Win32 Release"
+
+# PROP BASE Use_MFC 0
+# PROP BASE Use_Debug_Libraries 0
+# PROP BASE Output_Dir "Release"
+# PROP BASE Intermediate_Dir "Release"
+# PROP BASE Target_Dir ""
+# PROP Use_MFC 0
+# PROP Use_Debug_Libraries 0
+# PROP Output_Dir ""
+# PROP Intermediate_Dir "DLL\Release"
+# PROP Ignore_Export_Lib 0
+# PROP Target_Dir ""
+# ADD BASE CPP /nologo /MT /W3 /GX /O2 /D "WIN32" /D "NDEBUG" /D "_WINDOWS" /D "_MBCS" /D "_USRDLL" /D "RMCAST_EXPORTS" /YX /FD /c
+# ADD CPP /nologo /MD /W3 /GX /O2 /I "..\.." /D "WIN32" /D "NDEBUG" /D "_WINDOWS" /D "_MBCS" /D "_USRDLL" /D "ACE_RMCAST_BUILD_DLL" /FD /c
+# SUBTRACT CPP /YX
+# ADD BASE MTL /nologo /D "NDEBUG" /mktyplib203 /win32
+# ADD MTL /nologo /D "NDEBUG" /mktyplib203 /win32
+# ADD BASE RSC /l 0x409 /d "NDEBUG"
+# ADD RSC /l 0x409 /d "NDEBUG"
+BSC32=bscmake.exe
+# ADD BASE BSC32 /nologo
+# ADD BSC32 /nologo
+LINK32=link.exe
+# ADD BASE LINK32 kernel32.lib user32.lib gdi32.lib winspool.lib comdlg32.lib advapi32.lib shell32.lib ole32.lib oleaut32.lib uuid.lib odbc32.lib odbccp32.lib /nologo /dll /machine:I386
+# ADD LINK32 ace.lib /nologo /dll /machine:I386 /out:"..\..\bin\ACE_RMCast.dll" /libpath:".."
+
+!ELSEIF "$(CFG)" == "RMCast - Win32 Debug"
+
+# PROP BASE Use_MFC 0
+# PROP BASE Use_Debug_Libraries 1
+# PROP BASE Output_Dir "Debug"
+# PROP BASE Intermediate_Dir "Debug"
+# PROP BASE Target_Dir ""
+# PROP Use_MFC 0
+# PROP Use_Debug_Libraries 1
+# PROP Output_Dir ""
+# PROP Intermediate_Dir "DLL\Debug"
+# PROP Ignore_Export_Lib 0
+# PROP Target_Dir ""
+# ADD BASE CPP /nologo /MTd /W3 /Gm /GX /Zi /Od /D "WIN32" /D "_DEBUG" /D "_WINDOWS" /D "_MBCS" /D "_USRDLL" /D "RMCAST_EXPORTS" /YX /FD /c
+# ADD CPP /nologo /MDd /W3 /Gm /GX /Zi /Od /I "..\.." /D "WIN32" /D "_DEBUG" /D "_WINDOWS" /D "_MBCS" /D "_USRDLL" /D "ACE_RMCAST_BUILD_DLL" /FD /c
+# SUBTRACT CPP /YX
+# ADD BASE MTL /nologo /D "_DEBUG" /mktyplib203 /win32
+# ADD MTL /nologo /D "_DEBUG" /mktyplib203 /win32
+# ADD BASE RSC /l 0x409 /d "_DEBUG"
+# ADD RSC /l 0x409 /d "_DEBUG"
+BSC32=bscmake.exe
+# ADD BASE BSC32 /nologo
+# ADD BSC32 /nologo
+LINK32=link.exe
+# ADD BASE LINK32 kernel32.lib user32.lib gdi32.lib winspool.lib comdlg32.lib advapi32.lib shell32.lib ole32.lib oleaut32.lib uuid.lib odbc32.lib odbccp32.lib /nologo /dll /debug /machine:I386 /pdbtype:sept
+# ADD LINK32 aced.lib /nologo /dll /debug /machine:I386 /out:"..\..\bin\ACE_RMCastd.dll" /pdbtype:sept /libpath:".."
+
+!ENDIF
+
+# Begin Target
+
+# Name "RMCast - Win32 Release"
+# Name "RMCast - Win32 Debug"
+# Begin Group "Source Files"
+
+# PROP Default_Filter "cpp;c;cxx;rc;def;r;odl;idl;hpj;bat"
+# Begin Source File
+
+SOURCE=.\RMCast_Partial_Message.cpp
+# End Source File
+# End Group
+# Begin Group "Header Files"
+
+# PROP Default_Filter "h;hpp;hxx;hm;inl"
+# Begin Source File
+
+SOURCE=.\RMCast_Export.h
+# End Source File
+# Begin Source File
+
+SOURCE=.\RMCast_Fragment.h
+# End Source File
+# Begin Source File
+
+SOURCE=.\RMCast_Partial_Message.h
+# End Source File
+# Begin Source File
+
+SOURCE=.\RMCast_Reassembly.h
+# End Source File
+# End Group
+# Begin Group "Inline Files"
+
+# PROP Default_Filter "i"
+# Begin Source File
+
+SOURCE=.\RMCast_Fragment.i
+# End Source File
+# Begin Source File
+
+SOURCE=.\RMCast_Partial_Message.i
+# End Source File
+# Begin Source File
+
+SOURCE=.\RMCast_Reassembly.i
+# End Source File
+# End Group
+# Begin Group "Template Files"
+
+# PROP Default_Filter ""
+# Begin Source File
+
+SOURCE=.\RMCast_Fragment.cpp
+# PROP Exclude_From_Build 1
+# End Source File
+# Begin Source File
+
+SOURCE=.\RMCast_Reassembly.cpp
+# PROP Exclude_From_Build 1
+# End Source File
+# End Group
+# End Target
+# End Project
diff --git a/protocols/ace/RMCast/RMCast.dsw b/protocols/ace/RMCast/RMCast.dsw new file mode 100644 index 00000000000..a322ca31785 --- /dev/null +++ b/protocols/ace/RMCast/RMCast.dsw @@ -0,0 +1,29 @@ +Microsoft Developer Studio Workspace File, Format Version 6.00
+# WARNING: DO NOT EDIT OR DELETE THIS WORKSPACE FILE!
+
+###############################################################################
+
+Project: "RMCast"=.\RMCast.dsp - Package Owner=<4>
+
+Package=<5>
+{{{
+}}}
+
+Package=<4>
+{{{
+}}}
+
+###############################################################################
+
+Global:
+
+Package=<5>
+{{{
+}}}
+
+Package=<3>
+{{{
+}}}
+
+###############################################################################
+
diff --git a/protocols/ace/RMCast/RMCast_Export.h b/protocols/ace/RMCast/RMCast_Export.h new file mode 100644 index 00000000000..665d4ecf229 --- /dev/null +++ b/protocols/ace/RMCast/RMCast_Export.h @@ -0,0 +1,40 @@ +// -*- C++ -*- +// $Id$ +// Definition for Win32 Export directives. +// This file is generated automatically by +// generate_export_file.pl +// ------------------------------ +#if !defined (ACE_RMCAST_EXPORT_H) +#define ACE_RMCAST_EXPORT_H + +#include "ace/config-all.h" + +#if !defined (ACE_RMCAST_HAS_DLL) +#define ACE_RMCAST_HAS_DLL 1 +#endif /* ! ACE_RMCAST_HAS_DLL */ + +#if defined (ACE_RMCAST_HAS_DLL) +# if (ACE_RMCAST_HAS_DLL == 1) +# if defined (ACE_RMCAST_BUILD_DLL) +# define ACE_RMCast_Export ACE_Proper_Export_Flag +# define ACE_RMCAST_SINGLETON_DECLARATION(T) ACE_EXPORT_SINGLETON_DECLARATION (T) +# define ACE_RMCAST_SINGLETON_DECLARE(SINGLETON_TYPE, CLASS, LOCK) ACE_EXPORT_SINGLETON_DECLARE(SINGLETON_TYPE, CLASS, LOCK) +# else +# define ACE_RMCast_Export ACE_Proper_Import_Flag +# define ACE_RMCAST_SINGLETON_DECLARATION(T) ACE_IMPORT_SINGLETON_DECLARATION (T) +# define ACE_RMCAST_SINGLETON_DECLARE(SINGLETON_TYPE, CLASS, LOCK) ACE_IMPORT_SINGLETON_DECLARE(SINGLETON_TYPE, CLASS, LOCK) +# endif /* ACE_RMCAST_BUILD_DLL */ +# else +# define ACE_RMCast_Export +# define ACE_RMCAST_SINGLETON_DECLARATION(T) +# define ACE_RMCAST_SINGLETON_DECLARE(SINGLETON_TYPE, CLASS, LOCK) +# endif /* ! ACE_RMCAST_HAS_DLL == 1 */ +#else +# define ACE_RMCast_Export +# define ACE_RMCAST_SINGLETON_DECLARATION(T) +# define ACE_RMCAST_SINGLETON_DECLARE(SINGLETON_TYPE, CLASS, LOCK) +#endif /* ACE_RMCAST_HAS_DLL */ + +#endif /* ACE_RMCAST_EXPORT_H */ + +// End of auto generated file. diff --git a/protocols/ace/RMCast/RMCast_Fragment.cpp b/protocols/ace/RMCast/RMCast_Fragment.cpp new file mode 100644 index 00000000000..0578690f018 --- /dev/null +++ b/protocols/ace/RMCast/RMCast_Fragment.cpp @@ -0,0 +1,204 @@ +// $Id$ + +#ifndef ACE_RMCAST_FRAGMENT_C +#define ACE_RMCAST_FRAGMENT_C + +#include "RMCast_Fragment.h" + +#if !defined (ACE_LACKS_PRAGMA_ONCE) +# pragma once +#endif /* ACE_LACKS_PRAGMA_ONCE */ + +#if !defined (__ACE_INLINE__) +#include "RMCast_Fragment.i" +#endif /* __ACE_INLINE__ */ + +ACE_RCSID(ace, RMCast_Fragment, "$Id$") + + +template <ACE_SYNCH_DECL> +ACE_RMCast_Fragment<ACE_SYNCH_USE>:: +ACE_RMCast_Fragment (ACE_Thread_Manager *thr_mgr, + ACE_Message_Queue<ACE_SYNCH_USE> *mq) + : ACE_Task<ACE_SYNCH_USE> (thr_mgr, mq) + , max_fragment_size_ (ACE_RMCAST_DEFAULT_FRAGMENT_SIZE) +{ +} + +template <ACE_SYNCH_DECL> +ACE_RMCast_Fragment<ACE_SYNCH_USE>:: +~ACE_RMCast_Fragment (void) +{ +} + +template <ACE_SYNCH_DECL> int +ACE_RMCast_Fragment<ACE_SYNCH_USE>::put (ACE_Message_Block *mb, + ACE_Time_Value *tv) +{ + // @@ We should keep the total size precomputed + size_t total_size = mb->total_size (); + +#if defined (ACE_HAS_BROKEN_DGRAM_SENDV) + const int TAO_WRITEV_MAX = IOV_MAX - 1; +#else + const int TAO_WRITEV_MAX = IOV_MAX; +#endif /* ACE_HAS_BROKEN_DGRAM_SENDV */ + + const size_t max_fragment_payload = this->max_fragment_size_; + + // Iterate over all the message blocks in the chain. If there is + // enough data to send an MTU then it is sent immediately. + // The last fragment is sent with whatever data remains. + // A single fragment can expand multiple message blocks, put + // together in an <iovec> array, it is also possible that a single + // message block requires multiple fragments... so the code below is + // as simple as possible, but not any simpler ;-) + + + // The first piece of each fragment is a header that contains: + // - A sequence number for reassembly, this is unrelated to + // the sequence number for re-transmission. + // NOTE: yes, this increases the bandwidth requires by 4 bytes on + // each message, I don't think this is a big deal. + // - A fragment offset for reassembly. + // - The total size of the message, so the reassembly layer knows + // when a complete message has been received. + + ACE_UINT32 message_sequence_number; + ACE_UINT32 fragment_offset = 0; + { + ACE_GUARD_RETURN (ACE_SYNCH_MUTEX_T, ace_mon, this->mutex_, -1); + message_sequence_number = ++(this->sequence_number_generator_); + } + + + // The underlying transport layer can only tolerate so many elements + // in a chain, so we must count them and send a fragment if we are + // going over the limit. + + ACE_Message_Block blocks[TAO_WRITEV_MAX]; + + // The first message block contains the fragmentation layer + // header... + ACE_UINT32 header[3]; + header[0] = ACE_HTONL(message_sequence_number); + header[1] = ACE_HTONL(fragment_offset); + header[2] = ACE_HTONL(total_size); + + const size_t fragment_header_size = sizeof(header); + + blocks[0].init (ACE_reinterpret_cast(char*,header), + fragment_header_size); + blocks[0].wr_ptr (fragment_header_size); + + // How many elements of the <blocks> array are in use... + int iovcnt = 1; + + // The size of the current message, adding the size of all its + // message blocks. + size_t fragment_size = fragment_header_size; + + for (ACE_Message_Block* b = mb; b != 0; b = b->cont ()) + { + // Add the block to the vector... + + ACE_Message_Block *last_block = blocks + iovcnt; + + last_block->data_block (b->data_block ()->duplicate ()); + last_block->rd_ptr (b->rd_ptr ()); + last_block->wr_ptr (b->wr_ptr ()); + last_block->cont (0); + // Set the continuation field + blocks[iovcnt - 1].cont (last_block); + + size_t last_block_length = last_block->length (); + + // Recompute the state of the fragment + fragment_size += last_block_length; + iovcnt++; + + while (fragment_size >= max_fragment_payload) + { + // We have filled a fragment. It is possible that we need + // to split the last message block in multiple fragments, + // thus the loop above... + + // First adjust the last message block to exactly fit in the + // fragment: + size_t last_sent_mb_len = + max_fragment_payload - (fragment_size - last_block_length); + + // Send only enough data of the last message block to fill + // the fragment... + last_block->wr_ptr (last_block->rd_ptr () + + last_sent_mb_len); + + if (this->put_next (blocks, tv) == -1) + return -1; + + // adjust the offset + fragment_offset += max_fragment_payload - fragment_header_size; + header[1] = ACE_HTONL(fragment_offset); + + // Now compute how much data is left in the last message + // block, to check if we should continue sending it... + last_block_length -= last_sent_mb_len; + if (last_block_length == 0) + { + // No more data from this message block, just continue + // the outer loop... + iovcnt = 1; + fragment_size = fragment_header_size; + blocks[0].cont (0); + break; // while + } + + // There is some data left, we try to send it in a single + // fragment, if it is still too big the beginning of this + // loop will adjust things. + + // We must put the data in the right place in the array.. + char *rd_ptr = last_block->rd_ptr () + last_sent_mb_len; + char *wr_ptr = rd_ptr + last_block_length; + blocks[1].data_block (last_block->replace_data_block (0)); + + // And determine what segment of the data will be sent.. + blocks[1].rd_ptr (rd_ptr); + blocks[1].wr_ptr (wr_ptr); + blocks[1].cont (0); + last_block = &blocks[1]; + + // Setup the cont field... + blocks[0].cont (last_block); + + // Adjust the state of the fragment + fragment_size = last_block_length + fragment_header_size; + iovcnt = 2; + + // Notice that if <fragment_size> is too big the start of + // this loop will continue the fragmentation. + } + + // It is also possible to fill up the iovec array before the + // fragment is completed, in this case we must send whatever we + // have: + if (iovcnt == TAO_WRITEV_MAX) + { + if (this->put_next (blocks, tv) == -1) + return -1; + + fragment_offset += fragment_size - fragment_header_size; + header[1] = ACE_HTONL(fragment_offset); + iovcnt = 1; + fragment_size = fragment_header_size; + blocks[0].cont (0); + } + } + + if (iovcnt == 1) + return 0; + + return this->put_next (blocks, tv); +} + +#endif /* ACE_RMCAST_FRAGMENT_C */ diff --git a/protocols/ace/RMCast/RMCast_Fragment.h b/protocols/ace/RMCast/RMCast_Fragment.h new file mode 100644 index 00000000000..8aa7024770e --- /dev/null +++ b/protocols/ace/RMCast/RMCast_Fragment.h @@ -0,0 +1,68 @@ +// $Id$ + +// ============================================================================ +// +// = DESCRIPTION +// The fragmentation task for the reliable multicast library +// +// = AUTHOR +// Carlos O'Ryan <coryan@cs.wustl.edu> +// +// ============================================================================ + +#ifndef ACE_RMCAST_FRAGMENT_H +#define ACE_RMCAST_FRAGMENT_H +#include "ace/pre.h" + +#include "RMCast_Export.h" +#include "ace/Task.h" + +#if !defined (ACE_LACKS_PRAGMA_ONCE) +# pragma once +#endif /* ACE_LACKS_PRAGMA_ONCE */ + +#ifndef ACE_RMCAST_DEFAULT_FRAGMENT_SIZE +# define ACE_RMCAST_DEFAULT_FRAGMENT_SIZE 1024 +#endif /* ACE_RMCAST_DEFAULT_FRAGMENT_SIZE */ + +template <ACE_SYNCH_DECL> +class ACE_RMCast_Export ACE_RMCast_Fragment : public ACE_Task<ACE_SYNCH_USE> +{ +public: + ACE_RMCast_Fragment (ACE_Thread_Manager *thr_mgr = 0, + ACE_Message_Queue<ACE_SYNCH_USE> *mq = 0); + // Constructor + + virtual ~ACE_RMCast_Fragment (void); + // Destructor + + size_t max_fragment_size (void) const; + // Accessor for the max_fragment size. + // There is no modifier, the maximum fragment size is obtained using + // feedback from the lower layer (transport?) + + // = The ACE_Task methods + int put (ACE_Message_Block *, ACE_Time_Value *timeout = 0); + +private: + size_t max_fragment_size_; + + ACE_SYNCH_MUTEX_T mutex_; + ACE_UINT32 sequence_number_generator_; + // The sequence number generator +}; + +#if defined (__ACE_INLINE__) +#include "RMCast_Fragment.i" +#endif /* __ACE_INLINE__ */ + +#if defined (ACE_TEMPLATES_REQUIRE_SOURCE) +#include "RMCast_Fragment.cpp" +#endif /* ACE_TEMPLATES_REQUIRE_SOURCE */ + +#if defined (ACE_TEMPLATES_REQUIRE_PRAGMA) +#pragma implementation ("RMCast_Fragment.cpp") +#endif /* ACE_TEMPLATES_REQUIRE_PRAGMA */ + +#include "ace/post.h" +#endif /* ACE_RMCAST_FRAGMENT_H */ diff --git a/protocols/ace/RMCast/RMCast_Fragment.i b/protocols/ace/RMCast/RMCast_Fragment.i new file mode 100644 index 00000000000..0cf9f7eea9b --- /dev/null +++ b/protocols/ace/RMCast/RMCast_Fragment.i @@ -0,0 +1,8 @@ +// $Id$ + +template <ACE_SYNCH_DECL> ACE_INLINE size_t +ACE_RMCast_Fragment<ACE_SYNCH_USE>::max_fragment_size (void) const +{ + return this->max_fragment_size_; +} + diff --git a/protocols/ace/RMCast/RMCast_Partial_Message.cpp b/protocols/ace/RMCast/RMCast_Partial_Message.cpp new file mode 100644 index 00000000000..054619d6998 --- /dev/null +++ b/protocols/ace/RMCast/RMCast_Partial_Message.cpp @@ -0,0 +1,180 @@ +// $Id$ + +#include "RMCast_Partial_Message.h" + +#if !defined (__ACE_INLINE__) +#include "RMCast_Partial_Message.i" +#endif /* __ACE_INLINE__ */ + +ACE_RCSID(ace, RMCast_Partial_Message, "$Id$") + +ACE_RMCast_Partial_Message:: +ACE_RMCast_Partial_Message (ACE_UINT32 message_size) + : max_hole_count_ (ACE_RMCAST_DEFAULT_HOLE_COUNT), + hole_count_ (1) +{ + ACE_NEW (this->hole_list_, + ACE_RMCast_Partial_Message::Hole[this->max_hole_count_]); + this->hole_list_[0].start = 0; + this->hole_list_[0].end = message_size; + + this->message_body_.size (message_size); + this->message_body_.wr_ptr (message_size); +} + +ACE_RMCast_Partial_Message:: +~ACE_RMCast_Partial_Message (void) +{ + delete[] this->hole_list_; +} + +int +ACE_RMCast_Partial_Message::fragment_received (ACE_UINT32 message_size, + ACE_UINT32 offset, + ACE_Message_Block *mb) +{ + // Just copy the data... + char *rd_ptr = this->message_body_.rd_ptr () + offset; + size_t total_length = mb->length () - 12; + if (total_length > 0) + { + ACE_OS::memcpy (rd_ptr, + mb->rd_ptr () + 12, + total_length); + rd_ptr += total_length; + } + { + for (const ACE_Message_Block *i = mb->cont (); i != 0; i = i->cont ()) + { + ACE_OS::memcpy (rd_ptr, i->rd_ptr (), i->length ()); + rd_ptr += i->length (); + total_length += i->length (); + } + } + + // The algorithm works like this: + // + // For each hole we determine if there is an intersection between + // the hole and the incoming fragment. If there is none we do + // nothing (actually since the holes are ordered we can stop the + // iteration if the + + ACE_UINT32 start = offset; + ACE_UINT32 end = offset + total_length; + + for (size_t i = 0; i < this->hole_count_; ++i) + { + Hole& hole = this->hole_list_[i]; + + if (end <= hole.start) + return 0; + if (start >= hole.end) + continue; + + // OK there is some intersection. + + // There are only three cases for the <start> value: + // start < hole.start + // start == hole.start + // hole.start < start < hole.end + // + // But the code for both start == hole.start and start < + // hole.start is identical.... + + if (start <= hole.start) + { + if (end < hole.end) + { + // In this case we shrink the hole + hole.start = end; + return 0; + } + else // end >= hole.end + { + // We remove the hole, and continue the iteration... + if (this->remove_hole (i) == -1) + return -1; + continue; + } + } + else // hole.start < start < hole.end + { + if (end >= hole.end) + { + // Just adjust the size of the hole... + hole.start = start; + return 0; + } + else // if (end < hole.end) + { + // Nasty, we need to insert a new hole... + if (this->insert_hole (i, end, hole.end) == -1) + return -1; + // and change the old hole... + // NOTE: we have to refetch it because the array may + // have been reallocated! + this->hole_list_[i].end = start; + continue; + } + } + } + return 0; + // @@ OLD COMMENTS, the algorithm has changed since! + // There are several cases: + // + // 1) The fragment is completely contained in data already received, + // nothing changes in this case. + // + // 2) Part of the fragment is contained in data already received and + // part is new data: + // 2.1) The new data closes a hole, remove it from the list + // 2.2) The beginning of the new fragment is the new data, reduce + // the size of the hole + // 2.3) The end of the new fragment is the new data, increment + // the size of the received block + // + // 3) The fragment is completely contained in a hole + // 3.1) It closes the hole, remove it from the list + // 3.2) It starts at the beginning of a hole, grow the received + // block + // 3.3) It ends at the end of a hole, reduce the hole size + // 3.4) It is in the middle of a hole, insert a new hole + // +} + +int +ACE_RMCast_Partial_Message::insert_hole (size_t i, + ACE_UINT32 start, + ACE_UINT32 end) +{ + if (this->hole_count_ == this->max_hole_count_) + { + this->max_hole_count_ *= 2; + Hole *tmp; + ACE_NEW_RETURN (tmp, Hole[this->max_hole_count_], -1); + for (size_t j = 0; j != this->hole_count_; ++j) + { + tmp[j] = this->hole_list_[j]; + } + } + for (size_t j = this->hole_count_; j != i + 1; --j) + { + this->hole_list_[j] = this->hole_list_[j - 1]; + } + + this->hole_list_[i + 1].start = start; + this->hole_list_[i + 1].end = end; + this->hole_count_++; + + return 0; +} + +int +ACE_RMCast_Partial_Message::remove_hole (size_t i) +{ + for (size_t j = i; j != this->hole_count_ - 1; ++j) + this->hole_list_[j] = this->hole_list_[j + 1]; + + this->hole_count_--; + return 0; +} diff --git a/protocols/ace/RMCast/RMCast_Partial_Message.h b/protocols/ace/RMCast/RMCast_Partial_Message.h new file mode 100644 index 00000000000..af66435825a --- /dev/null +++ b/protocols/ace/RMCast/RMCast_Partial_Message.h @@ -0,0 +1,74 @@ +// $Id$ + +// ============================================================================ +// +// = DESCRIPTION +// Helper class used in the reassembly layer of the realiable +// multicast library. +// +// = AUTHOR +// Carlos O'Ryan <coryan@cs.wustl.edu> +// +// ============================================================================ + +#ifndef ACE_RMCAST_PARTIAL_MESSAGE_H +#define ACE_RMCAST_PARTIAL_MESSAGE_H +#include "ace/pre.h" + +#include "RMCast_Export.h" +#include "ace/Task.h" + +#if !defined (ACE_LACKS_PRAGMA_ONCE) +# pragma once +#endif /* ACE_LACKS_PRAGMA_ONCE */ + +#ifndef ACE_RMCAST_DEFAULT_HOLE_COUNT +#define ACE_RMCAST_DEFAULT_HOLE_COUNT 16 +#endif /* ACE_RMCAST_DEFAULT_HOLE_COUNT */ + +class ACE_RMCast_Export ACE_RMCast_Partial_Message +{ +public: + ACE_RMCast_Partial_Message (ACE_UINT32 message_size); + ~ACE_RMCast_Partial_Message (void); + + int fragment_received (ACE_UINT32 message_size, + ACE_UINT32 offset, + ACE_Message_Block *mb); + int is_complete (void) const; + + ACE_Message_Block *message_body (void); + // Return the body of the message, the memory is owned by the + // class. + +private: + int insert_hole (size_t i, + ACE_UINT32 start, + ACE_UINT32 end); + // Insert a new hole into the list + + int remove_hole (size_t i); + // Remove a hole from the list + +private: + ACE_Message_Block message_body_; + // Used to rebuild the body of the message + + struct Hole + { + ACE_UINT32 start; + ACE_UINT32 end; + }; + + Hole *hole_list_; + size_t max_hole_count_; + size_t hole_count_; + // The current list of holes in the message_body. +}; + +#if defined (__ACE_INLINE__) +#include "RMCast_Partial_Message.i" +#endif /* __ACE_INLINE__ */ + +#include "ace/post.h" +#endif /* ACE_RMCAST_PARTIAL_MESSAGE_H */ diff --git a/protocols/ace/RMCast/RMCast_Partial_Message.i b/protocols/ace/RMCast/RMCast_Partial_Message.i new file mode 100644 index 00000000000..7be89aa1932 --- /dev/null +++ b/protocols/ace/RMCast/RMCast_Partial_Message.i @@ -0,0 +1,15 @@ +// $Id$ + +ACE_INLINE int +ACE_RMCast_Partial_Message::is_complete (void) const +{ + return (this->hole_count_ == 0) + || (this->hole_count_ == 1 + && this->hole_list_[0].start == this->hole_list_[0].end); +} + +ACE_INLINE ACE_Message_Block * +ACE_RMCast_Partial_Message::message_body (void) +{ + return &this->message_body_; +} diff --git a/protocols/ace/RMCast/RMCast_Reassembly.cpp b/protocols/ace/RMCast/RMCast_Reassembly.cpp new file mode 100644 index 00000000000..be56e6cd9e8 --- /dev/null +++ b/protocols/ace/RMCast/RMCast_Reassembly.cpp @@ -0,0 +1,108 @@ +// $Id$ + +#ifndef ACE_RMCAST_REASSEMBLY_C +#define ACE_RMCAST_REASSEMBLY_C + +#include "RMCast_Reassembly.h" + +#if !defined (ACE_LACKS_PRAGMA_ONCE) +# pragma once +#endif /* ACE_LACKS_PRAGMA_ONCE */ + +#include "RMCast_Partial_Message.h" + +#if !defined (__ACE_INLINE__) +#include "RMCast_Reassembly.i" +#endif /* __ACE_INLINE__ */ + +ACE_RCSID(ace, RMCast_Reassembly, "$Id$") + + +template <ACE_SYNCH_DECL> +ACE_RMCast_Reassembly<ACE_SYNCH_USE>:: +ACE_RMCast_Reassembly (ACE_Thread_Manager *thr_mgr, + ACE_Message_Queue<ACE_SYNCH_USE> *mq) + : ACE_Task<ACE_SYNCH_USE> (thr_mgr, mq) +{ +} + +template <ACE_SYNCH_DECL> +ACE_RMCast_Reassembly<ACE_SYNCH_USE>:: +~ACE_RMCast_Reassembly (void) +{ + for (Message_Map_Iterator i = this->messages_.begin (); + i != this->messages_.end (); + ++i) + { + ACE_RMCast_Partial_Message *message = (*i).int_id_; + if (message != 0) + delete message; + } + this->messages_.unbind_all (); +} + +template <ACE_SYNCH_DECL> int +ACE_RMCast_Reassembly<ACE_SYNCH_USE>::put (ACE_Message_Block *mb, + ACE_Time_Value *tv) +{ + ACE_UINT32 header[3]; + size_t fragment_header_size = sizeof(header); + + if (mb->length () < fragment_header_size) + ACE_ERROR_RETURN ((LM_ERROR, + "Message block too small, " + "not enough room for the header\n"), + -1); + + ACE_OS::memcpy (header, mb->rd_ptr (), fragment_header_size); + + ACE_UINT32 message_sequence_number = ACE_NTOHL(header[0]); + ACE_UINT32 offset = ACE_NTOHL(header[1]); + ACE_UINT32 message_size = ACE_NTOHL(header[2]); + + if (mb->length () + offset > message_size) + return -1; // Corrupt message? + + ACE_RMCast_Partial_Message *message; + + { + ACE_GUARD_RETURN (ACE_SYNCH_MUTEX_T, ace_mon, this->mutex_, -1); + if (this->messages_.find (message_sequence_number, message) == -1) + { + ACE_NEW_RETURN (message, + ACE_RMCast_Partial_Message (message_size), + -1); + + if (this->messages_.bind (message_sequence_number, + message) == -1) + return -1; // Internal error? + } + + // The message was in the collection, but it has been received + // already, this is a duplicate fragment, just drop it. + if (message == 0) + return 0; + + if (message->fragment_received (message_size, + offset, + mb) == -1) + return -1; + + if (!message->is_complete ()) + return 0; + + // Remove the message from the collection, but leave a marker + // to indicate that it was already received... + if (this->messages_.rebind (message_sequence_number, 0) == -1) + return -1; + } + + // Push the message... + int r = this->put_next (message->message_body (), tv); + + delete message; + + return r; +} + +#endif /* ACE_RMCAST_REASSEMBLY_C */ diff --git a/protocols/ace/RMCast/RMCast_Reassembly.h b/protocols/ace/RMCast/RMCast_Reassembly.h new file mode 100644 index 00000000000..456c060948a --- /dev/null +++ b/protocols/ace/RMCast/RMCast_Reassembly.h @@ -0,0 +1,67 @@ +// $Id$ + +// ============================================================================ +// +// = DESCRIPTION +// The reassembly task for the reliable multicast library +// +// = AUTHOR +// Carlos O'Ryan <coryan@cs.wustl.edu> +// +// ============================================================================ + +#ifndef ACE_RMCAST_REASSEMBLY_H +#define ACE_RMCAST_REASSEMBLY_H +#include "ace/pre.h" + +#include "RMCast_Export.h" +#include "ace/Task.h" +#include "ace/Hash_Map_Manager.h" + +#if !defined (ACE_LACKS_PRAGMA_ONCE) +# pragma once +#endif /* ACE_LACKS_PRAGMA_ONCE */ + +class ACE_RMCast_Partial_Message; + +template <ACE_SYNCH_DECL> +class ACE_RMCast_Export ACE_RMCast_Reassembly : public ACE_Task<ACE_SYNCH_USE> +{ +public: + ACE_RMCast_Reassembly (ACE_Thread_Manager *thr_mgr = 0, + ACE_Message_Queue<ACE_SYNCH_USE> *mq = 0); + // Constructor + + virtual ~ACE_RMCast_Reassembly (void); + // Destructor + + // = The ACE_Task methods + int put (ACE_Message_Block *, ACE_Time_Value *timeout = 0); + +private: + ACE_SYNCH_MUTEX_T mutex_; + typedef + ACE_Hash_Map_Manager<ACE_UINT32,ACE_RMCast_Partial_Message*,ACE_Null_Mutex> + Message_Map; + typedef + ACE_Hash_Map_Iterator<ACE_UINT32,ACE_RMCast_Partial_Message*,ACE_Null_Mutex> + Message_Map_Iterator; + + Message_Map messages_; + // The array of partially received messages +}; + +#if defined (__ACE_INLINE__) +#include "RMCast_Reassembly.i" +#endif /* __ACE_INLINE__ */ + +#if defined (ACE_TEMPLATES_REQUIRE_SOURCE) +#include "RMCast_Reassembly.cpp" +#endif /* ACE_TEMPLATES_REQUIRE_SOURCE */ + +#if defined (ACE_TEMPLATES_REQUIRE_PRAGMA) +#pragma implementation ("RMCast_Reassembly.cpp") +#endif /* ACE_TEMPLATES_REQUIRE_PRAGMA */ + +#include "ace/post.h" +#endif /* ACE_RMCAST_REASSEMBLY_H */ diff --git a/protocols/ace/RMCast/RMCast_Reassembly.i b/protocols/ace/RMCast/RMCast_Reassembly.i new file mode 100644 index 00000000000..74e88caa0c5 --- /dev/null +++ b/protocols/ace/RMCast/RMCast_Reassembly.i @@ -0,0 +1,2 @@ +// $Id$ + |