summaryrefslogtreecommitdiff
path: root/protocols/ace/RMCast
diff options
context:
space:
mode:
authorcoryan <coryan@ae88bc3d-4319-0410-8dbf-d08b4c9d3795>2000-06-17 00:39:09 +0000
committercoryan <coryan@ae88bc3d-4319-0410-8dbf-d08b4c9d3795>2000-06-17 00:39:09 +0000
commit9858d5181a243af42a9a15048e0bf7db4928f94c (patch)
tree8c598bbe84a843f5eb9b5fd895ba1acafa128f47 /protocols/ace/RMCast
parent6107fcba7504cca59cc0b9d85286b73507e3897e (diff)
downloadATCD-9858d5181a243af42a9a15048e0bf7db4928f94c.tar.gz
ChangeLogTag:Fri Jun 16 17:30:18 2000 Carlos O'Ryan <coryan@uci.edu>
Diffstat (limited to 'protocols/ace/RMCast')
-rw-r--r--protocols/ace/RMCast/Makefile40
-rw-r--r--protocols/ace/RMCast/RMCast.dsp151
-rw-r--r--protocols/ace/RMCast/RMCast.dsw29
-rw-r--r--protocols/ace/RMCast/RMCast_Export.h40
-rw-r--r--protocols/ace/RMCast/RMCast_Fragment.cpp204
-rw-r--r--protocols/ace/RMCast/RMCast_Fragment.h68
-rw-r--r--protocols/ace/RMCast/RMCast_Fragment.i8
-rw-r--r--protocols/ace/RMCast/RMCast_Partial_Message.cpp180
-rw-r--r--protocols/ace/RMCast/RMCast_Partial_Message.h74
-rw-r--r--protocols/ace/RMCast/RMCast_Partial_Message.i15
-rw-r--r--protocols/ace/RMCast/RMCast_Reassembly.cpp108
-rw-r--r--protocols/ace/RMCast/RMCast_Reassembly.h67
-rw-r--r--protocols/ace/RMCast/RMCast_Reassembly.i2
13 files changed, 986 insertions, 0 deletions
diff --git a/protocols/ace/RMCast/Makefile b/protocols/ace/RMCast/Makefile
new file mode 100644
index 00000000000..87d640afe74
--- /dev/null
+++ b/protocols/ace/RMCast/Makefile
@@ -0,0 +1,40 @@
+#----------------------------------------------------------------------------
+#
+# $Id$
+#
+#----------------------------------------------------------------------------
+
+MAKEFILE = Makefile
+LIB = libACE_RMCast.a
+SHLIB = libACE_RMCast.$(SOEXT)
+
+FILES= \
+ RMCast_Partial_Message
+TEMPLATE_FILES = \
+ RMCast_Fragment \
+ RMCast_Reassembly
+
+#----------------------------------------------------------------------------
+# Include macros and targets
+#----------------------------------------------------------------------------
+
+include $(ACE_ROOT)/include/makeinclude/wrapper_macros.GNU
+
+LSRC = $(addsuffix .cpp,$(FILES))
+
+include $(ACE_ROOT)/include/makeinclude/macros.GNU
+include $(ACE_ROOT)/include/makeinclude/rules.common.GNU
+include $(ACE_ROOT)/include/makeinclude/rules.nonested.GNU
+include $(ACE_ROOT)/include/makeinclude/rules.lib.GNU
+include $(ACE_ROOT)/include/makeinclude/rules.local.GNU
+
+#----------------------------------------------------------------------------
+# Local targets
+#----------------------------------------------------------------------------
+
+#----------------------------------------------------------------------------
+# Dependencies
+#----------------------------------------------------------------------------
+# DO NOT DELETE THIS LINE -- g++dep uses it.
+# DO NOT PUT ANYTHING AFTER THIS LINE, IT WILL GO AWAY.
+# IF YOU PUT ANYTHING HERE IT WILL GO AWAY
diff --git a/protocols/ace/RMCast/RMCast.dsp b/protocols/ace/RMCast/RMCast.dsp
new file mode 100644
index 00000000000..53fdbf5cb9c
--- /dev/null
+++ b/protocols/ace/RMCast/RMCast.dsp
@@ -0,0 +1,151 @@
+# Microsoft Developer Studio Project File - Name="RMCast" - Package Owner=<4>
+# Microsoft Developer Studio Generated Build File, Format Version 6.00
+# ** DO NOT EDIT **
+
+# TARGTYPE "Win32 (x86) Dynamic-Link Library" 0x0102
+
+CFG=RMCast - Win32 Debug
+!MESSAGE This is not a valid makefile. To build this project using NMAKE,
+!MESSAGE use the Export Makefile command and run
+!MESSAGE
+!MESSAGE NMAKE /f "RMCast.mak".
+!MESSAGE
+!MESSAGE You can specify a configuration when running NMAKE
+!MESSAGE by defining the macro CFG on the command line. For example:
+!MESSAGE
+!MESSAGE NMAKE /f "RMCast.mak" CFG="RMCast - Win32 Debug"
+!MESSAGE
+!MESSAGE Possible choices for configuration are:
+!MESSAGE
+!MESSAGE "RMCast - Win32 Release" (based on "Win32 (x86) Dynamic-Link Library")
+!MESSAGE "RMCast - Win32 Debug" (based on "Win32 (x86) Dynamic-Link Library")
+!MESSAGE
+
+# Begin Project
+# PROP AllowPerConfigDependencies 0
+# PROP Scc_ProjName ""
+# PROP Scc_LocalPath ""
+CPP=cl.exe
+MTL=midl.exe
+RSC=rc.exe
+
+!IF "$(CFG)" == "RMCast - Win32 Release"
+
+# PROP BASE Use_MFC 0
+# PROP BASE Use_Debug_Libraries 0
+# PROP BASE Output_Dir "Release"
+# PROP BASE Intermediate_Dir "Release"
+# PROP BASE Target_Dir ""
+# PROP Use_MFC 0
+# PROP Use_Debug_Libraries 0
+# PROP Output_Dir ""
+# PROP Intermediate_Dir "DLL\Release"
+# PROP Ignore_Export_Lib 0
+# PROP Target_Dir ""
+# ADD BASE CPP /nologo /MT /W3 /GX /O2 /D "WIN32" /D "NDEBUG" /D "_WINDOWS" /D "_MBCS" /D "_USRDLL" /D "RMCAST_EXPORTS" /YX /FD /c
+# ADD CPP /nologo /MD /W3 /GX /O2 /I "..\.." /D "WIN32" /D "NDEBUG" /D "_WINDOWS" /D "_MBCS" /D "_USRDLL" /D "ACE_RMCAST_BUILD_DLL" /FD /c
+# SUBTRACT CPP /YX
+# ADD BASE MTL /nologo /D "NDEBUG" /mktyplib203 /win32
+# ADD MTL /nologo /D "NDEBUG" /mktyplib203 /win32
+# ADD BASE RSC /l 0x409 /d "NDEBUG"
+# ADD RSC /l 0x409 /d "NDEBUG"
+BSC32=bscmake.exe
+# ADD BASE BSC32 /nologo
+# ADD BSC32 /nologo
+LINK32=link.exe
+# ADD BASE LINK32 kernel32.lib user32.lib gdi32.lib winspool.lib comdlg32.lib advapi32.lib shell32.lib ole32.lib oleaut32.lib uuid.lib odbc32.lib odbccp32.lib /nologo /dll /machine:I386
+# ADD LINK32 ace.lib /nologo /dll /machine:I386 /out:"..\..\bin\ACE_RMCast.dll" /libpath:".."
+
+!ELSEIF "$(CFG)" == "RMCast - Win32 Debug"
+
+# PROP BASE Use_MFC 0
+# PROP BASE Use_Debug_Libraries 1
+# PROP BASE Output_Dir "Debug"
+# PROP BASE Intermediate_Dir "Debug"
+# PROP BASE Target_Dir ""
+# PROP Use_MFC 0
+# PROP Use_Debug_Libraries 1
+# PROP Output_Dir ""
+# PROP Intermediate_Dir "DLL\Debug"
+# PROP Ignore_Export_Lib 0
+# PROP Target_Dir ""
+# ADD BASE CPP /nologo /MTd /W3 /Gm /GX /Zi /Od /D "WIN32" /D "_DEBUG" /D "_WINDOWS" /D "_MBCS" /D "_USRDLL" /D "RMCAST_EXPORTS" /YX /FD /c
+# ADD CPP /nologo /MDd /W3 /Gm /GX /Zi /Od /I "..\.." /D "WIN32" /D "_DEBUG" /D "_WINDOWS" /D "_MBCS" /D "_USRDLL" /D "ACE_RMCAST_BUILD_DLL" /FD /c
+# SUBTRACT CPP /YX
+# ADD BASE MTL /nologo /D "_DEBUG" /mktyplib203 /win32
+# ADD MTL /nologo /D "_DEBUG" /mktyplib203 /win32
+# ADD BASE RSC /l 0x409 /d "_DEBUG"
+# ADD RSC /l 0x409 /d "_DEBUG"
+BSC32=bscmake.exe
+# ADD BASE BSC32 /nologo
+# ADD BSC32 /nologo
+LINK32=link.exe
+# ADD BASE LINK32 kernel32.lib user32.lib gdi32.lib winspool.lib comdlg32.lib advapi32.lib shell32.lib ole32.lib oleaut32.lib uuid.lib odbc32.lib odbccp32.lib /nologo /dll /debug /machine:I386 /pdbtype:sept
+# ADD LINK32 aced.lib /nologo /dll /debug /machine:I386 /out:"..\..\bin\ACE_RMCastd.dll" /pdbtype:sept /libpath:".."
+
+!ENDIF
+
+# Begin Target
+
+# Name "RMCast - Win32 Release"
+# Name "RMCast - Win32 Debug"
+# Begin Group "Source Files"
+
+# PROP Default_Filter "cpp;c;cxx;rc;def;r;odl;idl;hpj;bat"
+# Begin Source File
+
+SOURCE=.\RMCast_Partial_Message.cpp
+# End Source File
+# End Group
+# Begin Group "Header Files"
+
+# PROP Default_Filter "h;hpp;hxx;hm;inl"
+# Begin Source File
+
+SOURCE=.\RMCast_Export.h
+# End Source File
+# Begin Source File
+
+SOURCE=.\RMCast_Fragment.h
+# End Source File
+# Begin Source File
+
+SOURCE=.\RMCast_Partial_Message.h
+# End Source File
+# Begin Source File
+
+SOURCE=.\RMCast_Reassembly.h
+# End Source File
+# End Group
+# Begin Group "Inline Files"
+
+# PROP Default_Filter "i"
+# Begin Source File
+
+SOURCE=.\RMCast_Fragment.i
+# End Source File
+# Begin Source File
+
+SOURCE=.\RMCast_Partial_Message.i
+# End Source File
+# Begin Source File
+
+SOURCE=.\RMCast_Reassembly.i
+# End Source File
+# End Group
+# Begin Group "Template Files"
+
+# PROP Default_Filter ""
+# Begin Source File
+
+SOURCE=.\RMCast_Fragment.cpp
+# PROP Exclude_From_Build 1
+# End Source File
+# Begin Source File
+
+SOURCE=.\RMCast_Reassembly.cpp
+# PROP Exclude_From_Build 1
+# End Source File
+# End Group
+# End Target
+# End Project
diff --git a/protocols/ace/RMCast/RMCast.dsw b/protocols/ace/RMCast/RMCast.dsw
new file mode 100644
index 00000000000..a322ca31785
--- /dev/null
+++ b/protocols/ace/RMCast/RMCast.dsw
@@ -0,0 +1,29 @@
+Microsoft Developer Studio Workspace File, Format Version 6.00
+# WARNING: DO NOT EDIT OR DELETE THIS WORKSPACE FILE!
+
+###############################################################################
+
+Project: "RMCast"=.\RMCast.dsp - Package Owner=<4>
+
+Package=<5>
+{{{
+}}}
+
+Package=<4>
+{{{
+}}}
+
+###############################################################################
+
+Global:
+
+Package=<5>
+{{{
+}}}
+
+Package=<3>
+{{{
+}}}
+
+###############################################################################
+
diff --git a/protocols/ace/RMCast/RMCast_Export.h b/protocols/ace/RMCast/RMCast_Export.h
new file mode 100644
index 00000000000..665d4ecf229
--- /dev/null
+++ b/protocols/ace/RMCast/RMCast_Export.h
@@ -0,0 +1,40 @@
+// -*- C++ -*-
+// $Id$
+// Definition for Win32 Export directives.
+// This file is generated automatically by
+// generate_export_file.pl
+// ------------------------------
+#if !defined (ACE_RMCAST_EXPORT_H)
+#define ACE_RMCAST_EXPORT_H
+
+#include "ace/config-all.h"
+
+#if !defined (ACE_RMCAST_HAS_DLL)
+#define ACE_RMCAST_HAS_DLL 1
+#endif /* ! ACE_RMCAST_HAS_DLL */
+
+#if defined (ACE_RMCAST_HAS_DLL)
+# if (ACE_RMCAST_HAS_DLL == 1)
+# if defined (ACE_RMCAST_BUILD_DLL)
+# define ACE_RMCast_Export ACE_Proper_Export_Flag
+# define ACE_RMCAST_SINGLETON_DECLARATION(T) ACE_EXPORT_SINGLETON_DECLARATION (T)
+# define ACE_RMCAST_SINGLETON_DECLARE(SINGLETON_TYPE, CLASS, LOCK) ACE_EXPORT_SINGLETON_DECLARE(SINGLETON_TYPE, CLASS, LOCK)
+# else
+# define ACE_RMCast_Export ACE_Proper_Import_Flag
+# define ACE_RMCAST_SINGLETON_DECLARATION(T) ACE_IMPORT_SINGLETON_DECLARATION (T)
+# define ACE_RMCAST_SINGLETON_DECLARE(SINGLETON_TYPE, CLASS, LOCK) ACE_IMPORT_SINGLETON_DECLARE(SINGLETON_TYPE, CLASS, LOCK)
+# endif /* ACE_RMCAST_BUILD_DLL */
+# else
+# define ACE_RMCast_Export
+# define ACE_RMCAST_SINGLETON_DECLARATION(T)
+# define ACE_RMCAST_SINGLETON_DECLARE(SINGLETON_TYPE, CLASS, LOCK)
+# endif /* ! ACE_RMCAST_HAS_DLL == 1 */
+#else
+# define ACE_RMCast_Export
+# define ACE_RMCAST_SINGLETON_DECLARATION(T)
+# define ACE_RMCAST_SINGLETON_DECLARE(SINGLETON_TYPE, CLASS, LOCK)
+#endif /* ACE_RMCAST_HAS_DLL */
+
+#endif /* ACE_RMCAST_EXPORT_H */
+
+// End of auto generated file.
diff --git a/protocols/ace/RMCast/RMCast_Fragment.cpp b/protocols/ace/RMCast/RMCast_Fragment.cpp
new file mode 100644
index 00000000000..0578690f018
--- /dev/null
+++ b/protocols/ace/RMCast/RMCast_Fragment.cpp
@@ -0,0 +1,204 @@
+// $Id$
+
+#ifndef ACE_RMCAST_FRAGMENT_C
+#define ACE_RMCAST_FRAGMENT_C
+
+#include "RMCast_Fragment.h"
+
+#if !defined (ACE_LACKS_PRAGMA_ONCE)
+# pragma once
+#endif /* ACE_LACKS_PRAGMA_ONCE */
+
+#if !defined (__ACE_INLINE__)
+#include "RMCast_Fragment.i"
+#endif /* __ACE_INLINE__ */
+
+ACE_RCSID(ace, RMCast_Fragment, "$Id$")
+
+
+template <ACE_SYNCH_DECL>
+ACE_RMCast_Fragment<ACE_SYNCH_USE>::
+ACE_RMCast_Fragment (ACE_Thread_Manager *thr_mgr,
+ ACE_Message_Queue<ACE_SYNCH_USE> *mq)
+ : ACE_Task<ACE_SYNCH_USE> (thr_mgr, mq)
+ , max_fragment_size_ (ACE_RMCAST_DEFAULT_FRAGMENT_SIZE)
+{
+}
+
+template <ACE_SYNCH_DECL>
+ACE_RMCast_Fragment<ACE_SYNCH_USE>::
+~ACE_RMCast_Fragment (void)
+{
+}
+
+template <ACE_SYNCH_DECL> int
+ACE_RMCast_Fragment<ACE_SYNCH_USE>::put (ACE_Message_Block *mb,
+ ACE_Time_Value *tv)
+{
+ // @@ We should keep the total size precomputed
+ size_t total_size = mb->total_size ();
+
+#if defined (ACE_HAS_BROKEN_DGRAM_SENDV)
+ const int TAO_WRITEV_MAX = IOV_MAX - 1;
+#else
+ const int TAO_WRITEV_MAX = IOV_MAX;
+#endif /* ACE_HAS_BROKEN_DGRAM_SENDV */
+
+ const size_t max_fragment_payload = this->max_fragment_size_;
+
+ // Iterate over all the message blocks in the chain. If there is
+ // enough data to send an MTU then it is sent immediately.
+ // The last fragment is sent with whatever data remains.
+ // A single fragment can expand multiple message blocks, put
+ // together in an <iovec> array, it is also possible that a single
+ // message block requires multiple fragments... so the code below is
+ // as simple as possible, but not any simpler ;-)
+
+
+ // The first piece of each fragment is a header that contains:
+ // - A sequence number for reassembly, this is unrelated to
+ // the sequence number for re-transmission.
+ // NOTE: yes, this increases the bandwidth requires by 4 bytes on
+ // each message, I don't think this is a big deal.
+ // - A fragment offset for reassembly.
+ // - The total size of the message, so the reassembly layer knows
+ // when a complete message has been received.
+
+ ACE_UINT32 message_sequence_number;
+ ACE_UINT32 fragment_offset = 0;
+ {
+ ACE_GUARD_RETURN (ACE_SYNCH_MUTEX_T, ace_mon, this->mutex_, -1);
+ message_sequence_number = ++(this->sequence_number_generator_);
+ }
+
+
+ // The underlying transport layer can only tolerate so many elements
+ // in a chain, so we must count them and send a fragment if we are
+ // going over the limit.
+
+ ACE_Message_Block blocks[TAO_WRITEV_MAX];
+
+ // The first message block contains the fragmentation layer
+ // header...
+ ACE_UINT32 header[3];
+ header[0] = ACE_HTONL(message_sequence_number);
+ header[1] = ACE_HTONL(fragment_offset);
+ header[2] = ACE_HTONL(total_size);
+
+ const size_t fragment_header_size = sizeof(header);
+
+ blocks[0].init (ACE_reinterpret_cast(char*,header),
+ fragment_header_size);
+ blocks[0].wr_ptr (fragment_header_size);
+
+ // How many elements of the <blocks> array are in use...
+ int iovcnt = 1;
+
+ // The size of the current message, adding the size of all its
+ // message blocks.
+ size_t fragment_size = fragment_header_size;
+
+ for (ACE_Message_Block* b = mb; b != 0; b = b->cont ())
+ {
+ // Add the block to the vector...
+
+ ACE_Message_Block *last_block = blocks + iovcnt;
+
+ last_block->data_block (b->data_block ()->duplicate ());
+ last_block->rd_ptr (b->rd_ptr ());
+ last_block->wr_ptr (b->wr_ptr ());
+ last_block->cont (0);
+ // Set the continuation field
+ blocks[iovcnt - 1].cont (last_block);
+
+ size_t last_block_length = last_block->length ();
+
+ // Recompute the state of the fragment
+ fragment_size += last_block_length;
+ iovcnt++;
+
+ while (fragment_size >= max_fragment_payload)
+ {
+ // We have filled a fragment. It is possible that we need
+ // to split the last message block in multiple fragments,
+ // thus the loop above...
+
+ // First adjust the last message block to exactly fit in the
+ // fragment:
+ size_t last_sent_mb_len =
+ max_fragment_payload - (fragment_size - last_block_length);
+
+ // Send only enough data of the last message block to fill
+ // the fragment...
+ last_block->wr_ptr (last_block->rd_ptr ()
+ + last_sent_mb_len);
+
+ if (this->put_next (blocks, tv) == -1)
+ return -1;
+
+ // adjust the offset
+ fragment_offset += max_fragment_payload - fragment_header_size;
+ header[1] = ACE_HTONL(fragment_offset);
+
+ // Now compute how much data is left in the last message
+ // block, to check if we should continue sending it...
+ last_block_length -= last_sent_mb_len;
+ if (last_block_length == 0)
+ {
+ // No more data from this message block, just continue
+ // the outer loop...
+ iovcnt = 1;
+ fragment_size = fragment_header_size;
+ blocks[0].cont (0);
+ break; // while
+ }
+
+ // There is some data left, we try to send it in a single
+ // fragment, if it is still too big the beginning of this
+ // loop will adjust things.
+
+ // We must put the data in the right place in the array..
+ char *rd_ptr = last_block->rd_ptr () + last_sent_mb_len;
+ char *wr_ptr = rd_ptr + last_block_length;
+ blocks[1].data_block (last_block->replace_data_block (0));
+
+ // And determine what segment of the data will be sent..
+ blocks[1].rd_ptr (rd_ptr);
+ blocks[1].wr_ptr (wr_ptr);
+ blocks[1].cont (0);
+ last_block = &blocks[1];
+
+ // Setup the cont field...
+ blocks[0].cont (last_block);
+
+ // Adjust the state of the fragment
+ fragment_size = last_block_length + fragment_header_size;
+ iovcnt = 2;
+
+ // Notice that if <fragment_size> is too big the start of
+ // this loop will continue the fragmentation.
+ }
+
+ // It is also possible to fill up the iovec array before the
+ // fragment is completed, in this case we must send whatever we
+ // have:
+ if (iovcnt == TAO_WRITEV_MAX)
+ {
+ if (this->put_next (blocks, tv) == -1)
+ return -1;
+
+ fragment_offset += fragment_size - fragment_header_size;
+ header[1] = ACE_HTONL(fragment_offset);
+ iovcnt = 1;
+ fragment_size = fragment_header_size;
+ blocks[0].cont (0);
+ }
+ }
+
+ if (iovcnt == 1)
+ return 0;
+
+ return this->put_next (blocks, tv);
+}
+
+#endif /* ACE_RMCAST_FRAGMENT_C */
diff --git a/protocols/ace/RMCast/RMCast_Fragment.h b/protocols/ace/RMCast/RMCast_Fragment.h
new file mode 100644
index 00000000000..8aa7024770e
--- /dev/null
+++ b/protocols/ace/RMCast/RMCast_Fragment.h
@@ -0,0 +1,68 @@
+// $Id$
+
+// ============================================================================
+//
+// = DESCRIPTION
+// The fragmentation task for the reliable multicast library
+//
+// = AUTHOR
+// Carlos O'Ryan <coryan@cs.wustl.edu>
+//
+// ============================================================================
+
+#ifndef ACE_RMCAST_FRAGMENT_H
+#define ACE_RMCAST_FRAGMENT_H
+#include "ace/pre.h"
+
+#include "RMCast_Export.h"
+#include "ace/Task.h"
+
+#if !defined (ACE_LACKS_PRAGMA_ONCE)
+# pragma once
+#endif /* ACE_LACKS_PRAGMA_ONCE */
+
+#ifndef ACE_RMCAST_DEFAULT_FRAGMENT_SIZE
+# define ACE_RMCAST_DEFAULT_FRAGMENT_SIZE 1024
+#endif /* ACE_RMCAST_DEFAULT_FRAGMENT_SIZE */
+
+template <ACE_SYNCH_DECL>
+class ACE_RMCast_Export ACE_RMCast_Fragment : public ACE_Task<ACE_SYNCH_USE>
+{
+public:
+ ACE_RMCast_Fragment (ACE_Thread_Manager *thr_mgr = 0,
+ ACE_Message_Queue<ACE_SYNCH_USE> *mq = 0);
+ // Constructor
+
+ virtual ~ACE_RMCast_Fragment (void);
+ // Destructor
+
+ size_t max_fragment_size (void) const;
+ // Accessor for the max_fragment size.
+ // There is no modifier, the maximum fragment size is obtained using
+ // feedback from the lower layer (transport?)
+
+ // = The ACE_Task methods
+ int put (ACE_Message_Block *, ACE_Time_Value *timeout = 0);
+
+private:
+ size_t max_fragment_size_;
+
+ ACE_SYNCH_MUTEX_T mutex_;
+ ACE_UINT32 sequence_number_generator_;
+ // The sequence number generator
+};
+
+#if defined (__ACE_INLINE__)
+#include "RMCast_Fragment.i"
+#endif /* __ACE_INLINE__ */
+
+#if defined (ACE_TEMPLATES_REQUIRE_SOURCE)
+#include "RMCast_Fragment.cpp"
+#endif /* ACE_TEMPLATES_REQUIRE_SOURCE */
+
+#if defined (ACE_TEMPLATES_REQUIRE_PRAGMA)
+#pragma implementation ("RMCast_Fragment.cpp")
+#endif /* ACE_TEMPLATES_REQUIRE_PRAGMA */
+
+#include "ace/post.h"
+#endif /* ACE_RMCAST_FRAGMENT_H */
diff --git a/protocols/ace/RMCast/RMCast_Fragment.i b/protocols/ace/RMCast/RMCast_Fragment.i
new file mode 100644
index 00000000000..0cf9f7eea9b
--- /dev/null
+++ b/protocols/ace/RMCast/RMCast_Fragment.i
@@ -0,0 +1,8 @@
+// $Id$
+
+template <ACE_SYNCH_DECL> ACE_INLINE size_t
+ACE_RMCast_Fragment<ACE_SYNCH_USE>::max_fragment_size (void) const
+{
+ return this->max_fragment_size_;
+}
+
diff --git a/protocols/ace/RMCast/RMCast_Partial_Message.cpp b/protocols/ace/RMCast/RMCast_Partial_Message.cpp
new file mode 100644
index 00000000000..054619d6998
--- /dev/null
+++ b/protocols/ace/RMCast/RMCast_Partial_Message.cpp
@@ -0,0 +1,180 @@
+// $Id$
+
+#include "RMCast_Partial_Message.h"
+
+#if !defined (__ACE_INLINE__)
+#include "RMCast_Partial_Message.i"
+#endif /* __ACE_INLINE__ */
+
+ACE_RCSID(ace, RMCast_Partial_Message, "$Id$")
+
+ACE_RMCast_Partial_Message::
+ACE_RMCast_Partial_Message (ACE_UINT32 message_size)
+ : max_hole_count_ (ACE_RMCAST_DEFAULT_HOLE_COUNT),
+ hole_count_ (1)
+{
+ ACE_NEW (this->hole_list_,
+ ACE_RMCast_Partial_Message::Hole[this->max_hole_count_]);
+ this->hole_list_[0].start = 0;
+ this->hole_list_[0].end = message_size;
+
+ this->message_body_.size (message_size);
+ this->message_body_.wr_ptr (message_size);
+}
+
+ACE_RMCast_Partial_Message::
+~ACE_RMCast_Partial_Message (void)
+{
+ delete[] this->hole_list_;
+}
+
+int
+ACE_RMCast_Partial_Message::fragment_received (ACE_UINT32 message_size,
+ ACE_UINT32 offset,
+ ACE_Message_Block *mb)
+{
+ // Just copy the data...
+ char *rd_ptr = this->message_body_.rd_ptr () + offset;
+ size_t total_length = mb->length () - 12;
+ if (total_length > 0)
+ {
+ ACE_OS::memcpy (rd_ptr,
+ mb->rd_ptr () + 12,
+ total_length);
+ rd_ptr += total_length;
+ }
+ {
+ for (const ACE_Message_Block *i = mb->cont (); i != 0; i = i->cont ())
+ {
+ ACE_OS::memcpy (rd_ptr, i->rd_ptr (), i->length ());
+ rd_ptr += i->length ();
+ total_length += i->length ();
+ }
+ }
+
+ // The algorithm works like this:
+ //
+ // For each hole we determine if there is an intersection between
+ // the hole and the incoming fragment. If there is none we do
+ // nothing (actually since the holes are ordered we can stop the
+ // iteration if the
+
+ ACE_UINT32 start = offset;
+ ACE_UINT32 end = offset + total_length;
+
+ for (size_t i = 0; i < this->hole_count_; ++i)
+ {
+ Hole& hole = this->hole_list_[i];
+
+ if (end <= hole.start)
+ return 0;
+ if (start >= hole.end)
+ continue;
+
+ // OK there is some intersection.
+
+ // There are only three cases for the <start> value:
+ // start < hole.start
+ // start == hole.start
+ // hole.start < start < hole.end
+ //
+ // But the code for both start == hole.start and start <
+ // hole.start is identical....
+
+ if (start <= hole.start)
+ {
+ if (end < hole.end)
+ {
+ // In this case we shrink the hole
+ hole.start = end;
+ return 0;
+ }
+ else // end >= hole.end
+ {
+ // We remove the hole, and continue the iteration...
+ if (this->remove_hole (i) == -1)
+ return -1;
+ continue;
+ }
+ }
+ else // hole.start < start < hole.end
+ {
+ if (end >= hole.end)
+ {
+ // Just adjust the size of the hole...
+ hole.start = start;
+ return 0;
+ }
+ else // if (end < hole.end)
+ {
+ // Nasty, we need to insert a new hole...
+ if (this->insert_hole (i, end, hole.end) == -1)
+ return -1;
+ // and change the old hole...
+ // NOTE: we have to refetch it because the array may
+ // have been reallocated!
+ this->hole_list_[i].end = start;
+ continue;
+ }
+ }
+ }
+ return 0;
+ // @@ OLD COMMENTS, the algorithm has changed since!
+ // There are several cases:
+ //
+ // 1) The fragment is completely contained in data already received,
+ // nothing changes in this case.
+ //
+ // 2) Part of the fragment is contained in data already received and
+ // part is new data:
+ // 2.1) The new data closes a hole, remove it from the list
+ // 2.2) The beginning of the new fragment is the new data, reduce
+ // the size of the hole
+ // 2.3) The end of the new fragment is the new data, increment
+ // the size of the received block
+ //
+ // 3) The fragment is completely contained in a hole
+ // 3.1) It closes the hole, remove it from the list
+ // 3.2) It starts at the beginning of a hole, grow the received
+ // block
+ // 3.3) It ends at the end of a hole, reduce the hole size
+ // 3.4) It is in the middle of a hole, insert a new hole
+ //
+}
+
+int
+ACE_RMCast_Partial_Message::insert_hole (size_t i,
+ ACE_UINT32 start,
+ ACE_UINT32 end)
+{
+ if (this->hole_count_ == this->max_hole_count_)
+ {
+ this->max_hole_count_ *= 2;
+ Hole *tmp;
+ ACE_NEW_RETURN (tmp, Hole[this->max_hole_count_], -1);
+ for (size_t j = 0; j != this->hole_count_; ++j)
+ {
+ tmp[j] = this->hole_list_[j];
+ }
+ }
+ for (size_t j = this->hole_count_; j != i + 1; --j)
+ {
+ this->hole_list_[j] = this->hole_list_[j - 1];
+ }
+
+ this->hole_list_[i + 1].start = start;
+ this->hole_list_[i + 1].end = end;
+ this->hole_count_++;
+
+ return 0;
+}
+
+int
+ACE_RMCast_Partial_Message::remove_hole (size_t i)
+{
+ for (size_t j = i; j != this->hole_count_ - 1; ++j)
+ this->hole_list_[j] = this->hole_list_[j + 1];
+
+ this->hole_count_--;
+ return 0;
+}
diff --git a/protocols/ace/RMCast/RMCast_Partial_Message.h b/protocols/ace/RMCast/RMCast_Partial_Message.h
new file mode 100644
index 00000000000..af66435825a
--- /dev/null
+++ b/protocols/ace/RMCast/RMCast_Partial_Message.h
@@ -0,0 +1,74 @@
+// $Id$
+
+// ============================================================================
+//
+// = DESCRIPTION
+// Helper class used in the reassembly layer of the realiable
+// multicast library.
+//
+// = AUTHOR
+// Carlos O'Ryan <coryan@cs.wustl.edu>
+//
+// ============================================================================
+
+#ifndef ACE_RMCAST_PARTIAL_MESSAGE_H
+#define ACE_RMCAST_PARTIAL_MESSAGE_H
+#include "ace/pre.h"
+
+#include "RMCast_Export.h"
+#include "ace/Task.h"
+
+#if !defined (ACE_LACKS_PRAGMA_ONCE)
+# pragma once
+#endif /* ACE_LACKS_PRAGMA_ONCE */
+
+#ifndef ACE_RMCAST_DEFAULT_HOLE_COUNT
+#define ACE_RMCAST_DEFAULT_HOLE_COUNT 16
+#endif /* ACE_RMCAST_DEFAULT_HOLE_COUNT */
+
+class ACE_RMCast_Export ACE_RMCast_Partial_Message
+{
+public:
+ ACE_RMCast_Partial_Message (ACE_UINT32 message_size);
+ ~ACE_RMCast_Partial_Message (void);
+
+ int fragment_received (ACE_UINT32 message_size,
+ ACE_UINT32 offset,
+ ACE_Message_Block *mb);
+ int is_complete (void) const;
+
+ ACE_Message_Block *message_body (void);
+ // Return the body of the message, the memory is owned by the
+ // class.
+
+private:
+ int insert_hole (size_t i,
+ ACE_UINT32 start,
+ ACE_UINT32 end);
+ // Insert a new hole into the list
+
+ int remove_hole (size_t i);
+ // Remove a hole from the list
+
+private:
+ ACE_Message_Block message_body_;
+ // Used to rebuild the body of the message
+
+ struct Hole
+ {
+ ACE_UINT32 start;
+ ACE_UINT32 end;
+ };
+
+ Hole *hole_list_;
+ size_t max_hole_count_;
+ size_t hole_count_;
+ // The current list of holes in the message_body.
+};
+
+#if defined (__ACE_INLINE__)
+#include "RMCast_Partial_Message.i"
+#endif /* __ACE_INLINE__ */
+
+#include "ace/post.h"
+#endif /* ACE_RMCAST_PARTIAL_MESSAGE_H */
diff --git a/protocols/ace/RMCast/RMCast_Partial_Message.i b/protocols/ace/RMCast/RMCast_Partial_Message.i
new file mode 100644
index 00000000000..7be89aa1932
--- /dev/null
+++ b/protocols/ace/RMCast/RMCast_Partial_Message.i
@@ -0,0 +1,15 @@
+// $Id$
+
+ACE_INLINE int
+ACE_RMCast_Partial_Message::is_complete (void) const
+{
+ return (this->hole_count_ == 0)
+ || (this->hole_count_ == 1
+ && this->hole_list_[0].start == this->hole_list_[0].end);
+}
+
+ACE_INLINE ACE_Message_Block *
+ACE_RMCast_Partial_Message::message_body (void)
+{
+ return &this->message_body_;
+}
diff --git a/protocols/ace/RMCast/RMCast_Reassembly.cpp b/protocols/ace/RMCast/RMCast_Reassembly.cpp
new file mode 100644
index 00000000000..be56e6cd9e8
--- /dev/null
+++ b/protocols/ace/RMCast/RMCast_Reassembly.cpp
@@ -0,0 +1,108 @@
+// $Id$
+
+#ifndef ACE_RMCAST_REASSEMBLY_C
+#define ACE_RMCAST_REASSEMBLY_C
+
+#include "RMCast_Reassembly.h"
+
+#if !defined (ACE_LACKS_PRAGMA_ONCE)
+# pragma once
+#endif /* ACE_LACKS_PRAGMA_ONCE */
+
+#include "RMCast_Partial_Message.h"
+
+#if !defined (__ACE_INLINE__)
+#include "RMCast_Reassembly.i"
+#endif /* __ACE_INLINE__ */
+
+ACE_RCSID(ace, RMCast_Reassembly, "$Id$")
+
+
+template <ACE_SYNCH_DECL>
+ACE_RMCast_Reassembly<ACE_SYNCH_USE>::
+ACE_RMCast_Reassembly (ACE_Thread_Manager *thr_mgr,
+ ACE_Message_Queue<ACE_SYNCH_USE> *mq)
+ : ACE_Task<ACE_SYNCH_USE> (thr_mgr, mq)
+{
+}
+
+template <ACE_SYNCH_DECL>
+ACE_RMCast_Reassembly<ACE_SYNCH_USE>::
+~ACE_RMCast_Reassembly (void)
+{
+ for (Message_Map_Iterator i = this->messages_.begin ();
+ i != this->messages_.end ();
+ ++i)
+ {
+ ACE_RMCast_Partial_Message *message = (*i).int_id_;
+ if (message != 0)
+ delete message;
+ }
+ this->messages_.unbind_all ();
+}
+
+template <ACE_SYNCH_DECL> int
+ACE_RMCast_Reassembly<ACE_SYNCH_USE>::put (ACE_Message_Block *mb,
+ ACE_Time_Value *tv)
+{
+ ACE_UINT32 header[3];
+ size_t fragment_header_size = sizeof(header);
+
+ if (mb->length () < fragment_header_size)
+ ACE_ERROR_RETURN ((LM_ERROR,
+ "Message block too small, "
+ "not enough room for the header\n"),
+ -1);
+
+ ACE_OS::memcpy (header, mb->rd_ptr (), fragment_header_size);
+
+ ACE_UINT32 message_sequence_number = ACE_NTOHL(header[0]);
+ ACE_UINT32 offset = ACE_NTOHL(header[1]);
+ ACE_UINT32 message_size = ACE_NTOHL(header[2]);
+
+ if (mb->length () + offset > message_size)
+ return -1; // Corrupt message?
+
+ ACE_RMCast_Partial_Message *message;
+
+ {
+ ACE_GUARD_RETURN (ACE_SYNCH_MUTEX_T, ace_mon, this->mutex_, -1);
+ if (this->messages_.find (message_sequence_number, message) == -1)
+ {
+ ACE_NEW_RETURN (message,
+ ACE_RMCast_Partial_Message (message_size),
+ -1);
+
+ if (this->messages_.bind (message_sequence_number,
+ message) == -1)
+ return -1; // Internal error?
+ }
+
+ // The message was in the collection, but it has been received
+ // already, this is a duplicate fragment, just drop it.
+ if (message == 0)
+ return 0;
+
+ if (message->fragment_received (message_size,
+ offset,
+ mb) == -1)
+ return -1;
+
+ if (!message->is_complete ())
+ return 0;
+
+ // Remove the message from the collection, but leave a marker
+ // to indicate that it was already received...
+ if (this->messages_.rebind (message_sequence_number, 0) == -1)
+ return -1;
+ }
+
+ // Push the message...
+ int r = this->put_next (message->message_body (), tv);
+
+ delete message;
+
+ return r;
+}
+
+#endif /* ACE_RMCAST_REASSEMBLY_C */
diff --git a/protocols/ace/RMCast/RMCast_Reassembly.h b/protocols/ace/RMCast/RMCast_Reassembly.h
new file mode 100644
index 00000000000..456c060948a
--- /dev/null
+++ b/protocols/ace/RMCast/RMCast_Reassembly.h
@@ -0,0 +1,67 @@
+// $Id$
+
+// ============================================================================
+//
+// = DESCRIPTION
+// The reassembly task for the reliable multicast library
+//
+// = AUTHOR
+// Carlos O'Ryan <coryan@cs.wustl.edu>
+//
+// ============================================================================
+
+#ifndef ACE_RMCAST_REASSEMBLY_H
+#define ACE_RMCAST_REASSEMBLY_H
+#include "ace/pre.h"
+
+#include "RMCast_Export.h"
+#include "ace/Task.h"
+#include "ace/Hash_Map_Manager.h"
+
+#if !defined (ACE_LACKS_PRAGMA_ONCE)
+# pragma once
+#endif /* ACE_LACKS_PRAGMA_ONCE */
+
+class ACE_RMCast_Partial_Message;
+
+template <ACE_SYNCH_DECL>
+class ACE_RMCast_Export ACE_RMCast_Reassembly : public ACE_Task<ACE_SYNCH_USE>
+{
+public:
+ ACE_RMCast_Reassembly (ACE_Thread_Manager *thr_mgr = 0,
+ ACE_Message_Queue<ACE_SYNCH_USE> *mq = 0);
+ // Constructor
+
+ virtual ~ACE_RMCast_Reassembly (void);
+ // Destructor
+
+ // = The ACE_Task methods
+ int put (ACE_Message_Block *, ACE_Time_Value *timeout = 0);
+
+private:
+ ACE_SYNCH_MUTEX_T mutex_;
+ typedef
+ ACE_Hash_Map_Manager<ACE_UINT32,ACE_RMCast_Partial_Message*,ACE_Null_Mutex>
+ Message_Map;
+ typedef
+ ACE_Hash_Map_Iterator<ACE_UINT32,ACE_RMCast_Partial_Message*,ACE_Null_Mutex>
+ Message_Map_Iterator;
+
+ Message_Map messages_;
+ // The array of partially received messages
+};
+
+#if defined (__ACE_INLINE__)
+#include "RMCast_Reassembly.i"
+#endif /* __ACE_INLINE__ */
+
+#if defined (ACE_TEMPLATES_REQUIRE_SOURCE)
+#include "RMCast_Reassembly.cpp"
+#endif /* ACE_TEMPLATES_REQUIRE_SOURCE */
+
+#if defined (ACE_TEMPLATES_REQUIRE_PRAGMA)
+#pragma implementation ("RMCast_Reassembly.cpp")
+#endif /* ACE_TEMPLATES_REQUIRE_PRAGMA */
+
+#include "ace/post.h"
+#endif /* ACE_RMCAST_REASSEMBLY_H */
diff --git a/protocols/ace/RMCast/RMCast_Reassembly.i b/protocols/ace/RMCast/RMCast_Reassembly.i
new file mode 100644
index 00000000000..74e88caa0c5
--- /dev/null
+++ b/protocols/ace/RMCast/RMCast_Reassembly.i
@@ -0,0 +1,2 @@
+// $Id$
+