summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorcoryan <coryan@ae88bc3d-4319-0410-8dbf-d08b4c9d3795>2000-10-02 18:40:06 +0000
committercoryan <coryan@ae88bc3d-4319-0410-8dbf-d08b4c9d3795>2000-10-02 18:40:06 +0000
commit3c7edb739b596313a3e15fe1373bd488c2f37009 (patch)
treeec3f3e0df41c8f76d9c1ae3d472221e78a23d61f
parent6c8cdfe85f70b9be20d1dd80f9730dae491ff403 (diff)
downloadATCD-3c7edb739b596313a3e15fe1373bd488c2f37009.tar.gz
ChangeLogTag:Mon Oct 2 11:29:47 2000 Carlos O'Ryan <coryan@uci.edu>
-rw-r--r--ChangeLog203
-rw-r--r--ChangeLogs/ChangeLog-02a203
-rw-r--r--ChangeLogs/ChangeLog-03a203
-rw-r--r--ace/RMCast/Makefile20
-rw-r--r--ace/RMCast/RMCast.h15
-rw-r--r--ace/RMCast/RMCast_Copy_On_Write.cpp176
-rw-r--r--ace/RMCast/RMCast_Copy_On_Write.h173
-rw-r--r--ace/RMCast/RMCast_Copy_On_Write.i36
-rw-r--r--ace/RMCast/RMCast_Fragment.h40
-rw-r--r--ace/RMCast/RMCast_IO_UDP.cpp2
-rw-r--r--ace/RMCast/RMCast_IO_UDP.h65
-rw-r--r--ace/RMCast/RMCast_Membership.cpp24
-rw-r--r--ace/RMCast/RMCast_Membership.h46
-rw-r--r--ace/RMCast/RMCast_Membership.i2
-rw-r--r--ace/RMCast/RMCast_Module.h32
-rw-r--r--ace/RMCast/RMCast_Module_Factory.h31
-rw-r--r--ace/RMCast/RMCast_Partial_Message.h40
-rw-r--r--ace/RMCast/RMCast_Proxy.cpp6
-rw-r--r--ace/RMCast/RMCast_Proxy.h27
-rw-r--r--ace/RMCast/RMCast_Proxy.i2
-rw-r--r--ace/RMCast/RMCast_Retransmission.cpp127
-rw-r--r--ace/RMCast/RMCast_Retransmission.h67
-rw-r--r--ace/RMCast/RMCast_UDP_Event_Handler.h40
-rw-r--r--ace/RMCast/RMCast_UDP_Proxy.cpp3
-rw-r--r--ace/RMCast/RMCast_Worker.cpp19
-rw-r--r--ace/RMCast/RMCast_Worker.h36
-rw-r--r--ace/RMCast/RMCast_Worker.i1
-rw-r--r--protocols/ace/RMCast/Makefile20
-rw-r--r--protocols/ace/RMCast/RMCast.h15
-rw-r--r--protocols/ace/RMCast/RMCast_Copy_On_Write.cpp176
-rw-r--r--protocols/ace/RMCast/RMCast_Copy_On_Write.h173
-rw-r--r--protocols/ace/RMCast/RMCast_Copy_On_Write.i36
-rw-r--r--protocols/ace/RMCast/RMCast_Fragment.h40
-rw-r--r--protocols/ace/RMCast/RMCast_IO_UDP.cpp2
-rw-r--r--protocols/ace/RMCast/RMCast_IO_UDP.h65
-rw-r--r--protocols/ace/RMCast/RMCast_Membership.cpp24
-rw-r--r--protocols/ace/RMCast/RMCast_Membership.h46
-rw-r--r--protocols/ace/RMCast/RMCast_Membership.i2
-rw-r--r--protocols/ace/RMCast/RMCast_Module.h32
-rw-r--r--protocols/ace/RMCast/RMCast_Module_Factory.h31
-rw-r--r--protocols/ace/RMCast/RMCast_Partial_Message.h40
-rw-r--r--protocols/ace/RMCast/RMCast_Proxy.cpp6
-rw-r--r--protocols/ace/RMCast/RMCast_Proxy.h27
-rw-r--r--protocols/ace/RMCast/RMCast_Proxy.i2
-rw-r--r--protocols/ace/RMCast/RMCast_Retransmission.cpp127
-rw-r--r--protocols/ace/RMCast/RMCast_Retransmission.h67
-rw-r--r--protocols/ace/RMCast/RMCast_UDP_Event_Handler.h40
-rw-r--r--protocols/ace/RMCast/RMCast_UDP_Proxy.cpp3
-rw-r--r--protocols/ace/RMCast/RMCast_Worker.cpp19
-rw-r--r--protocols/ace/RMCast/RMCast_Worker.h36
-rw-r--r--protocols/ace/RMCast/RMCast_Worker.i1
-rw-r--r--tests/Makefile6
-rw-r--r--tests/RMCast/Makefile166
-rw-r--r--tests/RMCast/RMCast_Membership_Test.cpp32
-rw-r--r--tests/RMCast/RMCast_Retransmission_Test.cpp457
-rw-r--r--tests/RMCast/RMCast_Retransmission_Test.dsp96
-rw-r--r--tests/RMCast/RMCast_Tests.dsw12
57 files changed, 2842 insertions, 596 deletions
diff --git a/ChangeLog b/ChangeLog
index 549d519a4e6..0b20947f8fd 100644
--- a/ChangeLog
+++ b/ChangeLog
@@ -1,13 +1,68 @@
+Mon Oct 2 11:29:47 2000 Carlos O'Ryan <coryan@uci.edu>
+
+ * ace/RMCast/RMCast_Retransmission.h:
+ * ace/RMCast/RMCast_Retransmission.cpp:
+ Fixed several minor problems in the Retransmission module.
+ Use Copy_On_Write dispatching for synchronization of the
+ internal message buffer.
+
+ * ace/RMCast/RMCast_Copy_On_Write.h:
+ * ace/RMCast/RMCast_Copy_On_Write.i:
+ * ace/RMCast/RMCast_Copy_On_Write.cpp:
+ Implement Copy_On_Write semantics for a collection like the
+ retransmission buffer. This is based on the ESF_Copy_On_Write
+ strategy developed for the event channel.
+ We should try to refactor this one in a reusable strategy.
+
+ * ace/RMCast/RMCast_Worker.h:
+ * ace/RMCast/RMCast_Worker.i:
+ * ace/RMCast/RMCast_Worker.cpp:
+ To implement copy-on-write we need the "alternative" form of
+ iterators.
+
+ * ace/RMCast/RMCast.h:
+ * ace/RMCast/RMCast_Fragment.h:
+ * ace/RMCast/RMCast_IO_UDP.h:
+ * ace/RMCast/RMCast_IO_UDP.cpp:
+ * ace/RMCast/RMCast_Membership.h:
+ * ace/RMCast/RMCast_Membership.i:
+ * ace/RMCast/RMCast_Membership.cpp:
+ * ace/RMCast/RMCast_Module.h:
+ * ace/RMCast/RMCast_Module_Factory.h:
+ * ace/RMCast/RMCast_Partial_Message.h:
+ * ace/RMCast/RMCast_Proxy.h:
+ * ace/RMCast/RMCast_Proxy.i:
+ * ace/RMCast/RMCast_Proxy.cpp:
+ * ace/RMCast/RMCast_UDP_Event_Handler.h:
+ * ace/RMCast/RMCast_UDP_Proxy.cpp:
+ * tests/RMCast/RMCast_Membership_Test.cpp:
+ Made the comments more doxygen friendly.
+ Renamed the highest_in_sequence field to next_expected, the new
+ name really reflects its semantics, this is the next sequence
+ number that the peer is expecting. All the previous numbers
+ have been either received or are simply assumed lost.
+
+ * tests/Makefile:
+ If the rmcast makefile flag is set to 1 we build the RMCast
+ subdirectory too.
+
+ * tests/RMCast/Makefile:
+ * tests/RMCast/RMCast_Tests.dsw:
+ * tests/RMCast/RMCast_Retransmission_Test.dsp:
+ * tests/RMCast/RMCast_Retransmission_Test.cpp:
+ New test for the Retransmission module
+
+
Sun Oct 01 15:50:42 2000 Darrell Brunsch <brunsch@uci.edu>
* bin/auto_run_tests.pl:
Added support for a sandbox program that can shutdown a test
- if it hangs.
+ if it hangs.
Fri Sep 29 16:32:22 2000 Darrell Brunsch <brunsch@uci.edu>
- * bin/msvc_auto_compile.pl:
+ * bin/msvc_auto_compile.pl:
Added the BE and FE projects for TAO_IDL so the static version
gets built correctly in auto_compiles.
@@ -24,123 +79,123 @@ Fri Sep 29 16:17:34 2000 Steve Huston <shuston@riverace.com>
Thu Sep 28 22:06:44 2000 Ossama Othman <ossama@uci.edu>
- * ace/SSL/SSL_SOCK_Connector.h:
- * ace/SSL/SSL_SOCK_Connector.cpp (ACE_SSL_SOCK_Connector,
- shared_connect_start, shared_connect_finish, connect, complete):
+ * ace/SSL/SSL_SOCK_Connector.h:
+ * ace/SSL/SSL_SOCK_Connector.cpp (ACE_SSL_SOCK_Connector,
+ shared_connect_start, shared_connect_finish, connect, complete):
- Made ACE_Time_Value arguments const to match the changes
- detailed in:
+ Made ACE_Time_Value arguments const to match the changes
+ detailed in:
- Thu Sep 28 15:02:36 2000 Irfan Pyarali <irfan@cs.wustl.edu>
+ Thu Sep 28 15:02:36 2000 Irfan Pyarali <irfan@cs.wustl.edu>
Fri Sep 29 00:23:59 2000 Irfan Pyarali <irfan@cs.wustl.edu>
- * ace/Log_Msg.h (ACE_RETURN): Fixed minor error.
+ * ace/Log_Msg.h (ACE_RETURN): Fixed minor error.
Thu Sep 28 22:13:04 2000 Irfan Pyarali <irfan@cs.wustl.edu>
- * ace/Log_Msg: Macros are evil! All the ACE debugging macros were
- evaluating the user arguments twice, once in
- log_priority_enabled() and the other in log(). The problem was
- that set() was making a deep copy of the filename. Hence, this
- change:
+ * ace/Log_Msg: Macros are evil! All the ACE debugging macros were
+ evaluating the user arguments twice, once in
+ log_priority_enabled() and the other in log(). The problem was
+ that set() was making a deep copy of the filename. Hence, this
+ change:
- Wed May 3 11:43:05 2000 Douglas C. Schmidt <schmidt@ace.cs.wustl.edu>
+ Wed May 3 11:43:05 2000 Douglas C. Schmidt <schmidt@ace.cs.wustl.edu>
- was made to prevent the deep copy which was unnecessary when the
- message was not actually logged. However, this resulted in the
- macro evaluating the user arguments twice. Previously, a
- statement like this:
+ was made to prevent the deep copy which was unnecessary when the
+ message was not actually logged. However, this resulted in the
+ macro evaluating the user arguments twice. Previously, a
+ statement like this:
ACE_DEBUG ((LM_DEBUG,
"timeout occured, iterations left %d\n",
--iterations));
- <iterations> got reduced by one - now it got reduced by two ;-)
+ <iterations> got reduced by one - now it got reduced by two ;-)
- The solution was to make a shallow copy of the filename in
- conditional_set(). Then in log(), if the log priority is
- correct, make a deep copy and then continue will log(). The
- macros were changed to call conditional_set() instead of set().
+ The solution was to make a shallow copy of the filename in
+ conditional_set(). Then in log(), if the log priority is
+ correct, make a deep copy and then continue will log(). The
+ macros were changed to call conditional_set() instead of set().
- Also, changed ACE_RETURN to specify all the parameters,
- including <restart>, <callback>, and <stream>. Otherwise, the
- default parameters of set() will end up losing these parameters
- set by the user.
+ Also, changed ACE_RETURN to specify all the parameters,
+ including <restart>, <callback>, and <stream>. Otherwise, the
+ default parameters of set() will end up losing these parameters
+ set by the user.
Thu Sep 28 15:49:00 2000 Douglas C. Schmidt <schmidt@ace.cs.wustl.edu>
- * ace/Stream.h (class ACE_Stream): Updated the documentation to clarify
- that the ACE_Time_Value's are *absolute* time. Thanks to Pedro
- for reporting this. Thanks to Pedro Brandao
- <pbrandao@inescn.pt> for reporting this.
+ * ace/Stream.h (class ACE_Stream): Updated the documentation to clarify
+ that the ACE_Time_Value's are *absolute* time. Thanks to Pedro
+ for reporting this. Thanks to Pedro Brandao
+ <pbrandao@inescn.pt> for reporting this.
Thu Sep 28 15:02:36 2000 Irfan Pyarali <irfan@cs.wustl.edu>
- * ace/SOCK_Connector.cpp: ACE_Time_Value args to
- ACE_SOCK_Connector could be const. Had to patch functions in
- the following files to make it happen:
+ * ace/SOCK_Connector.cpp: ACE_Time_Value args to
+ ACE_SOCK_Connector could be const. Had to patch functions in
+ the following files to make it happen:
- ACE.cpp
- ACE.h
- OS.h
- OS.i
- SOCK_Connector.cpp
- SOCK_Connector.h
+ ACE.cpp
+ ACE.h
+ OS.h
+ OS.i
+ SOCK_Connector.cpp
+ SOCK_Connector.h
- Thanks to Steve Huston <shuston@riverace.com> for reporting
- this. This also fixes bug 673.
+ Thanks to Steve Huston <shuston@riverace.com> for reporting
+ this. This also fixes bug 673.
Thu Sep 28 11:14:29 2000 Martin Stack <mstack@cambertx.com>
- * ace/config-freebsd.h:
- * ace/config-freebsd-pthread.h:
- * ace/config-linux-common.h:
- * ace/config-irix6.x-common.h: Added/Renamed to
- ACE_USES_NEW_TERMIOS_STRUCT.
+ * ace/config-freebsd.h:
+ * ace/config-freebsd-pthread.h:
+ * ace/config-linux-common.h:
+ * ace/config-irix6.x-common.h: Added/Renamed to
+ ACE_USES_NEW_TERMIOS_STRUCT.
- * ace/TTY_IO.h:
- * ace/TTY_IO.cpp: The Win32 section was modified to implement a
- non-blocking read when read-timeout=0 is set. Also, it was
- modified to ensure proper operations when a read_timeout is
- required.
+ * ace/TTY_IO.h:
+ * ace/TTY_IO.cpp: The Win32 section was modified to implement a
+ non-blocking read when read-timeout=0 is set. Also, it was
+ modified to ensure proper operations when a read_timeout is
+ required.
- Code was added to enable the DTR line on both Win32 and unix
- platforms when the port is opened.
+ Code was added to enable the DTR line on both Win32 and unix
+ platforms when the port is opened.
- Several new flags where added to give proper access to the
- serial device.
+ Several new flags where added to give proper access to the
+ serial device.
- Relabled macro "ACE_USES_OLD_TERMIOS_STRUCT" to
- "ACE_USES_NEW_TERMIOS_STRUCT" to properly indicate its purpose.
+ Relabled macro "ACE_USES_OLD_TERMIOS_STRUCT" to
+ "ACE_USES_NEW_TERMIOS_STRUCT" to properly indicate its purpose.
Thu Sep 28 09:01:19 2000 Ossama Othman <ossama@uci.edu>
- * ace/config-g++-common.h:
+ * ace/config-g++-common.h:
- Reverted my g++ 2.95 updates. They work on all platforms but
- the cross-compiler used for VxWorks.
+ Reverted my g++ 2.95 updates. They work on all platforms but
+ the cross-compiler used for VxWorks.
Wed Sep 27 16:17:36 2000 Ossama Othman <ossama@uci.edu>
- * ace/IOStream.h:
- * ace/IOStream_T.h:
+ * ace/IOStream.h:
+ * ace/IOStream_T.h:
- Moved inclusion of `ace/INET_Addr.h' and `ace/Handle_Set.h' to
- `IOStream_T.h'. They weren't needed in `IOStream.h'.
+ Moved inclusion of `ace/INET_Addr.h' and `ace/Handle_Set.h' to
+ `IOStream_T.h'. They weren't needed in `IOStream.h'.
- * ace/config-g++-common.h (ACE_LACKS_AUTO_PTR,
- ACE_LACKS_STATIC_DATA_MEMBER_TEMPLATES,
- ACE_HAS_GNUC_BROKEN_TEMPLATE_INLINE_FUNCTIONS):
+ * ace/config-g++-common.h (ACE_LACKS_AUTO_PTR,
+ ACE_LACKS_STATIC_DATA_MEMBER_TEMPLATES,
+ ACE_HAS_GNUC_BROKEN_TEMPLATE_INLINE_FUNCTIONS):
- G++ 2.95.x properly support the auto_ptr class, templates with
- static data members, and inlined template functions.
+ G++ 2.95.x properly support the auto_ptr class, templates with
+ static data members, and inlined template functions.
Wed Sep 27 14:02:30 2000 Irfan Pyarali <irfan@cs.wustl.edu>
- * examples/Reactor/WFMO_Reactor/test_abandoned.cpp
- (handle_timeout): Moved <--this->iterations_> outside the DEBUG
- statement.
+ * examples/Reactor/WFMO_Reactor/test_abandoned.cpp
+ (handle_timeout): Moved <--this->iterations_> outside the DEBUG
+ statement.
Wed Sep 27 08:46:12 2000 Carlos O'Ryan <coryan@uci.edu>
diff --git a/ChangeLogs/ChangeLog-02a b/ChangeLogs/ChangeLog-02a
index 549d519a4e6..0b20947f8fd 100644
--- a/ChangeLogs/ChangeLog-02a
+++ b/ChangeLogs/ChangeLog-02a
@@ -1,13 +1,68 @@
+Mon Oct 2 11:29:47 2000 Carlos O'Ryan <coryan@uci.edu>
+
+ * ace/RMCast/RMCast_Retransmission.h:
+ * ace/RMCast/RMCast_Retransmission.cpp:
+ Fixed several minor problems in the Retransmission module.
+ Use Copy_On_Write dispatching for synchronization of the
+ internal message buffer.
+
+ * ace/RMCast/RMCast_Copy_On_Write.h:
+ * ace/RMCast/RMCast_Copy_On_Write.i:
+ * ace/RMCast/RMCast_Copy_On_Write.cpp:
+ Implement Copy_On_Write semantics for a collection like the
+ retransmission buffer. This is based on the ESF_Copy_On_Write
+ strategy developed for the event channel.
+ We should try to refactor this one in a reusable strategy.
+
+ * ace/RMCast/RMCast_Worker.h:
+ * ace/RMCast/RMCast_Worker.i:
+ * ace/RMCast/RMCast_Worker.cpp:
+ To implement copy-on-write we need the "alternative" form of
+ iterators.
+
+ * ace/RMCast/RMCast.h:
+ * ace/RMCast/RMCast_Fragment.h:
+ * ace/RMCast/RMCast_IO_UDP.h:
+ * ace/RMCast/RMCast_IO_UDP.cpp:
+ * ace/RMCast/RMCast_Membership.h:
+ * ace/RMCast/RMCast_Membership.i:
+ * ace/RMCast/RMCast_Membership.cpp:
+ * ace/RMCast/RMCast_Module.h:
+ * ace/RMCast/RMCast_Module_Factory.h:
+ * ace/RMCast/RMCast_Partial_Message.h:
+ * ace/RMCast/RMCast_Proxy.h:
+ * ace/RMCast/RMCast_Proxy.i:
+ * ace/RMCast/RMCast_Proxy.cpp:
+ * ace/RMCast/RMCast_UDP_Event_Handler.h:
+ * ace/RMCast/RMCast_UDP_Proxy.cpp:
+ * tests/RMCast/RMCast_Membership_Test.cpp:
+ Made the comments more doxygen friendly.
+ Renamed the highest_in_sequence field to next_expected, the new
+ name really reflects its semantics, this is the next sequence
+ number that the peer is expecting. All the previous numbers
+ have been either received or are simply assumed lost.
+
+ * tests/Makefile:
+ If the rmcast makefile flag is set to 1 we build the RMCast
+ subdirectory too.
+
+ * tests/RMCast/Makefile:
+ * tests/RMCast/RMCast_Tests.dsw:
+ * tests/RMCast/RMCast_Retransmission_Test.dsp:
+ * tests/RMCast/RMCast_Retransmission_Test.cpp:
+ New test for the Retransmission module
+
+
Sun Oct 01 15:50:42 2000 Darrell Brunsch <brunsch@uci.edu>
* bin/auto_run_tests.pl:
Added support for a sandbox program that can shutdown a test
- if it hangs.
+ if it hangs.
Fri Sep 29 16:32:22 2000 Darrell Brunsch <brunsch@uci.edu>
- * bin/msvc_auto_compile.pl:
+ * bin/msvc_auto_compile.pl:
Added the BE and FE projects for TAO_IDL so the static version
gets built correctly in auto_compiles.
@@ -24,123 +79,123 @@ Fri Sep 29 16:17:34 2000 Steve Huston <shuston@riverace.com>
Thu Sep 28 22:06:44 2000 Ossama Othman <ossama@uci.edu>
- * ace/SSL/SSL_SOCK_Connector.h:
- * ace/SSL/SSL_SOCK_Connector.cpp (ACE_SSL_SOCK_Connector,
- shared_connect_start, shared_connect_finish, connect, complete):
+ * ace/SSL/SSL_SOCK_Connector.h:
+ * ace/SSL/SSL_SOCK_Connector.cpp (ACE_SSL_SOCK_Connector,
+ shared_connect_start, shared_connect_finish, connect, complete):
- Made ACE_Time_Value arguments const to match the changes
- detailed in:
+ Made ACE_Time_Value arguments const to match the changes
+ detailed in:
- Thu Sep 28 15:02:36 2000 Irfan Pyarali <irfan@cs.wustl.edu>
+ Thu Sep 28 15:02:36 2000 Irfan Pyarali <irfan@cs.wustl.edu>
Fri Sep 29 00:23:59 2000 Irfan Pyarali <irfan@cs.wustl.edu>
- * ace/Log_Msg.h (ACE_RETURN): Fixed minor error.
+ * ace/Log_Msg.h (ACE_RETURN): Fixed minor error.
Thu Sep 28 22:13:04 2000 Irfan Pyarali <irfan@cs.wustl.edu>
- * ace/Log_Msg: Macros are evil! All the ACE debugging macros were
- evaluating the user arguments twice, once in
- log_priority_enabled() and the other in log(). The problem was
- that set() was making a deep copy of the filename. Hence, this
- change:
+ * ace/Log_Msg: Macros are evil! All the ACE debugging macros were
+ evaluating the user arguments twice, once in
+ log_priority_enabled() and the other in log(). The problem was
+ that set() was making a deep copy of the filename. Hence, this
+ change:
- Wed May 3 11:43:05 2000 Douglas C. Schmidt <schmidt@ace.cs.wustl.edu>
+ Wed May 3 11:43:05 2000 Douglas C. Schmidt <schmidt@ace.cs.wustl.edu>
- was made to prevent the deep copy which was unnecessary when the
- message was not actually logged. However, this resulted in the
- macro evaluating the user arguments twice. Previously, a
- statement like this:
+ was made to prevent the deep copy which was unnecessary when the
+ message was not actually logged. However, this resulted in the
+ macro evaluating the user arguments twice. Previously, a
+ statement like this:
ACE_DEBUG ((LM_DEBUG,
"timeout occured, iterations left %d\n",
--iterations));
- <iterations> got reduced by one - now it got reduced by two ;-)
+ <iterations> got reduced by one - now it got reduced by two ;-)
- The solution was to make a shallow copy of the filename in
- conditional_set(). Then in log(), if the log priority is
- correct, make a deep copy and then continue will log(). The
- macros were changed to call conditional_set() instead of set().
+ The solution was to make a shallow copy of the filename in
+ conditional_set(). Then in log(), if the log priority is
+ correct, make a deep copy and then continue will log(). The
+ macros were changed to call conditional_set() instead of set().
- Also, changed ACE_RETURN to specify all the parameters,
- including <restart>, <callback>, and <stream>. Otherwise, the
- default parameters of set() will end up losing these parameters
- set by the user.
+ Also, changed ACE_RETURN to specify all the parameters,
+ including <restart>, <callback>, and <stream>. Otherwise, the
+ default parameters of set() will end up losing these parameters
+ set by the user.
Thu Sep 28 15:49:00 2000 Douglas C. Schmidt <schmidt@ace.cs.wustl.edu>
- * ace/Stream.h (class ACE_Stream): Updated the documentation to clarify
- that the ACE_Time_Value's are *absolute* time. Thanks to Pedro
- for reporting this. Thanks to Pedro Brandao
- <pbrandao@inescn.pt> for reporting this.
+ * ace/Stream.h (class ACE_Stream): Updated the documentation to clarify
+ that the ACE_Time_Value's are *absolute* time. Thanks to Pedro
+ for reporting this. Thanks to Pedro Brandao
+ <pbrandao@inescn.pt> for reporting this.
Thu Sep 28 15:02:36 2000 Irfan Pyarali <irfan@cs.wustl.edu>
- * ace/SOCK_Connector.cpp: ACE_Time_Value args to
- ACE_SOCK_Connector could be const. Had to patch functions in
- the following files to make it happen:
+ * ace/SOCK_Connector.cpp: ACE_Time_Value args to
+ ACE_SOCK_Connector could be const. Had to patch functions in
+ the following files to make it happen:
- ACE.cpp
- ACE.h
- OS.h
- OS.i
- SOCK_Connector.cpp
- SOCK_Connector.h
+ ACE.cpp
+ ACE.h
+ OS.h
+ OS.i
+ SOCK_Connector.cpp
+ SOCK_Connector.h
- Thanks to Steve Huston <shuston@riverace.com> for reporting
- this. This also fixes bug 673.
+ Thanks to Steve Huston <shuston@riverace.com> for reporting
+ this. This also fixes bug 673.
Thu Sep 28 11:14:29 2000 Martin Stack <mstack@cambertx.com>
- * ace/config-freebsd.h:
- * ace/config-freebsd-pthread.h:
- * ace/config-linux-common.h:
- * ace/config-irix6.x-common.h: Added/Renamed to
- ACE_USES_NEW_TERMIOS_STRUCT.
+ * ace/config-freebsd.h:
+ * ace/config-freebsd-pthread.h:
+ * ace/config-linux-common.h:
+ * ace/config-irix6.x-common.h: Added/Renamed to
+ ACE_USES_NEW_TERMIOS_STRUCT.
- * ace/TTY_IO.h:
- * ace/TTY_IO.cpp: The Win32 section was modified to implement a
- non-blocking read when read-timeout=0 is set. Also, it was
- modified to ensure proper operations when a read_timeout is
- required.
+ * ace/TTY_IO.h:
+ * ace/TTY_IO.cpp: The Win32 section was modified to implement a
+ non-blocking read when read-timeout=0 is set. Also, it was
+ modified to ensure proper operations when a read_timeout is
+ required.
- Code was added to enable the DTR line on both Win32 and unix
- platforms when the port is opened.
+ Code was added to enable the DTR line on both Win32 and unix
+ platforms when the port is opened.
- Several new flags where added to give proper access to the
- serial device.
+ Several new flags where added to give proper access to the
+ serial device.
- Relabled macro "ACE_USES_OLD_TERMIOS_STRUCT" to
- "ACE_USES_NEW_TERMIOS_STRUCT" to properly indicate its purpose.
+ Relabled macro "ACE_USES_OLD_TERMIOS_STRUCT" to
+ "ACE_USES_NEW_TERMIOS_STRUCT" to properly indicate its purpose.
Thu Sep 28 09:01:19 2000 Ossama Othman <ossama@uci.edu>
- * ace/config-g++-common.h:
+ * ace/config-g++-common.h:
- Reverted my g++ 2.95 updates. They work on all platforms but
- the cross-compiler used for VxWorks.
+ Reverted my g++ 2.95 updates. They work on all platforms but
+ the cross-compiler used for VxWorks.
Wed Sep 27 16:17:36 2000 Ossama Othman <ossama@uci.edu>
- * ace/IOStream.h:
- * ace/IOStream_T.h:
+ * ace/IOStream.h:
+ * ace/IOStream_T.h:
- Moved inclusion of `ace/INET_Addr.h' and `ace/Handle_Set.h' to
- `IOStream_T.h'. They weren't needed in `IOStream.h'.
+ Moved inclusion of `ace/INET_Addr.h' and `ace/Handle_Set.h' to
+ `IOStream_T.h'. They weren't needed in `IOStream.h'.
- * ace/config-g++-common.h (ACE_LACKS_AUTO_PTR,
- ACE_LACKS_STATIC_DATA_MEMBER_TEMPLATES,
- ACE_HAS_GNUC_BROKEN_TEMPLATE_INLINE_FUNCTIONS):
+ * ace/config-g++-common.h (ACE_LACKS_AUTO_PTR,
+ ACE_LACKS_STATIC_DATA_MEMBER_TEMPLATES,
+ ACE_HAS_GNUC_BROKEN_TEMPLATE_INLINE_FUNCTIONS):
- G++ 2.95.x properly support the auto_ptr class, templates with
- static data members, and inlined template functions.
+ G++ 2.95.x properly support the auto_ptr class, templates with
+ static data members, and inlined template functions.
Wed Sep 27 14:02:30 2000 Irfan Pyarali <irfan@cs.wustl.edu>
- * examples/Reactor/WFMO_Reactor/test_abandoned.cpp
- (handle_timeout): Moved <--this->iterations_> outside the DEBUG
- statement.
+ * examples/Reactor/WFMO_Reactor/test_abandoned.cpp
+ (handle_timeout): Moved <--this->iterations_> outside the DEBUG
+ statement.
Wed Sep 27 08:46:12 2000 Carlos O'Ryan <coryan@uci.edu>
diff --git a/ChangeLogs/ChangeLog-03a b/ChangeLogs/ChangeLog-03a
index 549d519a4e6..0b20947f8fd 100644
--- a/ChangeLogs/ChangeLog-03a
+++ b/ChangeLogs/ChangeLog-03a
@@ -1,13 +1,68 @@
+Mon Oct 2 11:29:47 2000 Carlos O'Ryan <coryan@uci.edu>
+
+ * ace/RMCast/RMCast_Retransmission.h:
+ * ace/RMCast/RMCast_Retransmission.cpp:
+ Fixed several minor problems in the Retransmission module.
+ Use Copy_On_Write dispatching for synchronization of the
+ internal message buffer.
+
+ * ace/RMCast/RMCast_Copy_On_Write.h:
+ * ace/RMCast/RMCast_Copy_On_Write.i:
+ * ace/RMCast/RMCast_Copy_On_Write.cpp:
+ Implement Copy_On_Write semantics for a collection like the
+ retransmission buffer. This is based on the ESF_Copy_On_Write
+ strategy developed for the event channel.
+ We should try to refactor this one in a reusable strategy.
+
+ * ace/RMCast/RMCast_Worker.h:
+ * ace/RMCast/RMCast_Worker.i:
+ * ace/RMCast/RMCast_Worker.cpp:
+ To implement copy-on-write we need the "alternative" form of
+ iterators.
+
+ * ace/RMCast/RMCast.h:
+ * ace/RMCast/RMCast_Fragment.h:
+ * ace/RMCast/RMCast_IO_UDP.h:
+ * ace/RMCast/RMCast_IO_UDP.cpp:
+ * ace/RMCast/RMCast_Membership.h:
+ * ace/RMCast/RMCast_Membership.i:
+ * ace/RMCast/RMCast_Membership.cpp:
+ * ace/RMCast/RMCast_Module.h:
+ * ace/RMCast/RMCast_Module_Factory.h:
+ * ace/RMCast/RMCast_Partial_Message.h:
+ * ace/RMCast/RMCast_Proxy.h:
+ * ace/RMCast/RMCast_Proxy.i:
+ * ace/RMCast/RMCast_Proxy.cpp:
+ * ace/RMCast/RMCast_UDP_Event_Handler.h:
+ * ace/RMCast/RMCast_UDP_Proxy.cpp:
+ * tests/RMCast/RMCast_Membership_Test.cpp:
+ Made the comments more doxygen friendly.
+ Renamed the highest_in_sequence field to next_expected, the new
+ name really reflects its semantics, this is the next sequence
+ number that the peer is expecting. All the previous numbers
+ have been either received or are simply assumed lost.
+
+ * tests/Makefile:
+ If the rmcast makefile flag is set to 1 we build the RMCast
+ subdirectory too.
+
+ * tests/RMCast/Makefile:
+ * tests/RMCast/RMCast_Tests.dsw:
+ * tests/RMCast/RMCast_Retransmission_Test.dsp:
+ * tests/RMCast/RMCast_Retransmission_Test.cpp:
+ New test for the Retransmission module
+
+
Sun Oct 01 15:50:42 2000 Darrell Brunsch <brunsch@uci.edu>
* bin/auto_run_tests.pl:
Added support for a sandbox program that can shutdown a test
- if it hangs.
+ if it hangs.
Fri Sep 29 16:32:22 2000 Darrell Brunsch <brunsch@uci.edu>
- * bin/msvc_auto_compile.pl:
+ * bin/msvc_auto_compile.pl:
Added the BE and FE projects for TAO_IDL so the static version
gets built correctly in auto_compiles.
@@ -24,123 +79,123 @@ Fri Sep 29 16:17:34 2000 Steve Huston <shuston@riverace.com>
Thu Sep 28 22:06:44 2000 Ossama Othman <ossama@uci.edu>
- * ace/SSL/SSL_SOCK_Connector.h:
- * ace/SSL/SSL_SOCK_Connector.cpp (ACE_SSL_SOCK_Connector,
- shared_connect_start, shared_connect_finish, connect, complete):
+ * ace/SSL/SSL_SOCK_Connector.h:
+ * ace/SSL/SSL_SOCK_Connector.cpp (ACE_SSL_SOCK_Connector,
+ shared_connect_start, shared_connect_finish, connect, complete):
- Made ACE_Time_Value arguments const to match the changes
- detailed in:
+ Made ACE_Time_Value arguments const to match the changes
+ detailed in:
- Thu Sep 28 15:02:36 2000 Irfan Pyarali <irfan@cs.wustl.edu>
+ Thu Sep 28 15:02:36 2000 Irfan Pyarali <irfan@cs.wustl.edu>
Fri Sep 29 00:23:59 2000 Irfan Pyarali <irfan@cs.wustl.edu>
- * ace/Log_Msg.h (ACE_RETURN): Fixed minor error.
+ * ace/Log_Msg.h (ACE_RETURN): Fixed minor error.
Thu Sep 28 22:13:04 2000 Irfan Pyarali <irfan@cs.wustl.edu>
- * ace/Log_Msg: Macros are evil! All the ACE debugging macros were
- evaluating the user arguments twice, once in
- log_priority_enabled() and the other in log(). The problem was
- that set() was making a deep copy of the filename. Hence, this
- change:
+ * ace/Log_Msg: Macros are evil! All the ACE debugging macros were
+ evaluating the user arguments twice, once in
+ log_priority_enabled() and the other in log(). The problem was
+ that set() was making a deep copy of the filename. Hence, this
+ change:
- Wed May 3 11:43:05 2000 Douglas C. Schmidt <schmidt@ace.cs.wustl.edu>
+ Wed May 3 11:43:05 2000 Douglas C. Schmidt <schmidt@ace.cs.wustl.edu>
- was made to prevent the deep copy which was unnecessary when the
- message was not actually logged. However, this resulted in the
- macro evaluating the user arguments twice. Previously, a
- statement like this:
+ was made to prevent the deep copy which was unnecessary when the
+ message was not actually logged. However, this resulted in the
+ macro evaluating the user arguments twice. Previously, a
+ statement like this:
ACE_DEBUG ((LM_DEBUG,
"timeout occured, iterations left %d\n",
--iterations));
- <iterations> got reduced by one - now it got reduced by two ;-)
+ <iterations> got reduced by one - now it got reduced by two ;-)
- The solution was to make a shallow copy of the filename in
- conditional_set(). Then in log(), if the log priority is
- correct, make a deep copy and then continue will log(). The
- macros were changed to call conditional_set() instead of set().
+ The solution was to make a shallow copy of the filename in
+ conditional_set(). Then in log(), if the log priority is
+ correct, make a deep copy and then continue will log(). The
+ macros were changed to call conditional_set() instead of set().
- Also, changed ACE_RETURN to specify all the parameters,
- including <restart>, <callback>, and <stream>. Otherwise, the
- default parameters of set() will end up losing these parameters
- set by the user.
+ Also, changed ACE_RETURN to specify all the parameters,
+ including <restart>, <callback>, and <stream>. Otherwise, the
+ default parameters of set() will end up losing these parameters
+ set by the user.
Thu Sep 28 15:49:00 2000 Douglas C. Schmidt <schmidt@ace.cs.wustl.edu>
- * ace/Stream.h (class ACE_Stream): Updated the documentation to clarify
- that the ACE_Time_Value's are *absolute* time. Thanks to Pedro
- for reporting this. Thanks to Pedro Brandao
- <pbrandao@inescn.pt> for reporting this.
+ * ace/Stream.h (class ACE_Stream): Updated the documentation to clarify
+ that the ACE_Time_Value's are *absolute* time. Thanks to Pedro
+ for reporting this. Thanks to Pedro Brandao
+ <pbrandao@inescn.pt> for reporting this.
Thu Sep 28 15:02:36 2000 Irfan Pyarali <irfan@cs.wustl.edu>
- * ace/SOCK_Connector.cpp: ACE_Time_Value args to
- ACE_SOCK_Connector could be const. Had to patch functions in
- the following files to make it happen:
+ * ace/SOCK_Connector.cpp: ACE_Time_Value args to
+ ACE_SOCK_Connector could be const. Had to patch functions in
+ the following files to make it happen:
- ACE.cpp
- ACE.h
- OS.h
- OS.i
- SOCK_Connector.cpp
- SOCK_Connector.h
+ ACE.cpp
+ ACE.h
+ OS.h
+ OS.i
+ SOCK_Connector.cpp
+ SOCK_Connector.h
- Thanks to Steve Huston <shuston@riverace.com> for reporting
- this. This also fixes bug 673.
+ Thanks to Steve Huston <shuston@riverace.com> for reporting
+ this. This also fixes bug 673.
Thu Sep 28 11:14:29 2000 Martin Stack <mstack@cambertx.com>
- * ace/config-freebsd.h:
- * ace/config-freebsd-pthread.h:
- * ace/config-linux-common.h:
- * ace/config-irix6.x-common.h: Added/Renamed to
- ACE_USES_NEW_TERMIOS_STRUCT.
+ * ace/config-freebsd.h:
+ * ace/config-freebsd-pthread.h:
+ * ace/config-linux-common.h:
+ * ace/config-irix6.x-common.h: Added/Renamed to
+ ACE_USES_NEW_TERMIOS_STRUCT.
- * ace/TTY_IO.h:
- * ace/TTY_IO.cpp: The Win32 section was modified to implement a
- non-blocking read when read-timeout=0 is set. Also, it was
- modified to ensure proper operations when a read_timeout is
- required.
+ * ace/TTY_IO.h:
+ * ace/TTY_IO.cpp: The Win32 section was modified to implement a
+ non-blocking read when read-timeout=0 is set. Also, it was
+ modified to ensure proper operations when a read_timeout is
+ required.
- Code was added to enable the DTR line on both Win32 and unix
- platforms when the port is opened.
+ Code was added to enable the DTR line on both Win32 and unix
+ platforms when the port is opened.
- Several new flags where added to give proper access to the
- serial device.
+ Several new flags where added to give proper access to the
+ serial device.
- Relabled macro "ACE_USES_OLD_TERMIOS_STRUCT" to
- "ACE_USES_NEW_TERMIOS_STRUCT" to properly indicate its purpose.
+ Relabled macro "ACE_USES_OLD_TERMIOS_STRUCT" to
+ "ACE_USES_NEW_TERMIOS_STRUCT" to properly indicate its purpose.
Thu Sep 28 09:01:19 2000 Ossama Othman <ossama@uci.edu>
- * ace/config-g++-common.h:
+ * ace/config-g++-common.h:
- Reverted my g++ 2.95 updates. They work on all platforms but
- the cross-compiler used for VxWorks.
+ Reverted my g++ 2.95 updates. They work on all platforms but
+ the cross-compiler used for VxWorks.
Wed Sep 27 16:17:36 2000 Ossama Othman <ossama@uci.edu>
- * ace/IOStream.h:
- * ace/IOStream_T.h:
+ * ace/IOStream.h:
+ * ace/IOStream_T.h:
- Moved inclusion of `ace/INET_Addr.h' and `ace/Handle_Set.h' to
- `IOStream_T.h'. They weren't needed in `IOStream.h'.
+ Moved inclusion of `ace/INET_Addr.h' and `ace/Handle_Set.h' to
+ `IOStream_T.h'. They weren't needed in `IOStream.h'.
- * ace/config-g++-common.h (ACE_LACKS_AUTO_PTR,
- ACE_LACKS_STATIC_DATA_MEMBER_TEMPLATES,
- ACE_HAS_GNUC_BROKEN_TEMPLATE_INLINE_FUNCTIONS):
+ * ace/config-g++-common.h (ACE_LACKS_AUTO_PTR,
+ ACE_LACKS_STATIC_DATA_MEMBER_TEMPLATES,
+ ACE_HAS_GNUC_BROKEN_TEMPLATE_INLINE_FUNCTIONS):
- G++ 2.95.x properly support the auto_ptr class, templates with
- static data members, and inlined template functions.
+ G++ 2.95.x properly support the auto_ptr class, templates with
+ static data members, and inlined template functions.
Wed Sep 27 14:02:30 2000 Irfan Pyarali <irfan@cs.wustl.edu>
- * examples/Reactor/WFMO_Reactor/test_abandoned.cpp
- (handle_timeout): Moved <--this->iterations_> outside the DEBUG
- statement.
+ * examples/Reactor/WFMO_Reactor/test_abandoned.cpp
+ (handle_timeout): Moved <--this->iterations_> outside the DEBUG
+ statement.
Wed Sep 27 08:46:12 2000 Carlos O'Ryan <coryan@uci.edu>
diff --git a/ace/RMCast/Makefile b/ace/RMCast/Makefile
index ae13792c4c4..e97cd885493 100644
--- a/ace/RMCast/Makefile
+++ b/ace/RMCast/Makefile
@@ -636,17 +636,11 @@ include $(ACE_ROOT)/include/makeinclude/rules.local.GNU
$(ACE_ROOT)/ace/Basic_Types.i \
$(ACE_ROOT)/ace/Trace.h \
$(ACE_ROOT)/ace/OS.i \
- RMCast_Export.h RMCast.i RMCast_Module.i \
- $(ACE_ROOT)/ace/RB_Tree.h \
- $(ACE_ROOT)/ace/Functor.h \
+ RMCast_Export.h RMCast.i RMCast_Module.i RMCast_Copy_On_Write.h \
+ RMCast_Worker.h RMCast_Worker.i RMCast_Worker.cpp \
+ $(ACE_ROOT)/ace/Synch.h \
$(ACE_ROOT)/ace/ACE.h \
$(ACE_ROOT)/ace/ACE.i \
- $(ACE_ROOT)/ace/Functor.i \
- $(ACE_ROOT)/ace/Functor_T.h \
- $(ACE_ROOT)/ace/Functor_T.i \
- $(ACE_ROOT)/ace/Functor_T.cpp \
- $(ACE_ROOT)/ace/RB_Tree.i \
- $(ACE_ROOT)/ace/Synch.h \
$(ACE_ROOT)/ace/Synch.i \
$(ACE_ROOT)/ace/Synch_T.h \
$(ACE_ROOT)/ace/Event_Handler.h \
@@ -664,6 +658,14 @@ include $(ACE_ROOT)/include/makeinclude/rules.local.GNU
$(ACE_ROOT)/ace/Log_Record.h \
$(ACE_ROOT)/ace/Log_Priority.h \
$(ACE_ROOT)/ace/Log_Record.i \
+ RMCast_Copy_On_Write.i RMCast_Copy_On_Write.cpp \
+ $(ACE_ROOT)/ace/RB_Tree.h \
+ $(ACE_ROOT)/ace/Functor.h \
+ $(ACE_ROOT)/ace/Functor.i \
+ $(ACE_ROOT)/ace/Functor_T.h \
+ $(ACE_ROOT)/ace/Functor_T.i \
+ $(ACE_ROOT)/ace/Functor_T.cpp \
+ $(ACE_ROOT)/ace/RB_Tree.i \
$(ACE_ROOT)/ace/Malloc.h \
$(ACE_ROOT)/ace/Malloc_Base.h \
$(ACE_ROOT)/ace/Based_Pointer_T.h \
diff --git a/ace/RMCast/RMCast.h b/ace/RMCast/RMCast.h
index abf2a24e946..df3a0d48858 100644
--- a/ace/RMCast/RMCast.h
+++ b/ace/RMCast/RMCast.h
@@ -233,16 +233,19 @@ public:
*
* This message is used to provide feedback information to senders.
* It contains two sequence numbers:
- * - highest_in_sequence: is the sequence number of the last message
- * received without any lost messages before it
- * - highest_received: is the sequence number of the last_message
- * successfully received, there may be some messages lost before it
+ * - \param next_expected: is the sequence number of the next message
+ * expected, i.e. (next_expected-1) is the last message received
+ * without any losses before it.
+ * - \param highest_received: is the highest sequence number among
+ * all the messages successfully received.
+ * In other words, all messages lost (if any) are in the range:
+ * [next_expected,highest_received)
*
* <CODE>
* +---------+----------------------+<BR>
* | 8 bits | MT_ACK |<BR>
* +---------+----------------------+<BR>
- * | 32 bits | highest_in_sequence |<BR>
+ * | 32 bits | next_expected |<BR>
* +---------+----------------------+<BR>
* | 32 bits | highest_received |<BR>
* +---------+----------------------+<BR>
@@ -251,7 +254,7 @@ public:
struct Ack
{
//! The last message received without any losses before it.
- ACE_UINT32 highest_in_sequence;
+ ACE_UINT32 next_expected;
//! The last message successfully received
ACE_UINT32 highest_received;
diff --git a/ace/RMCast/RMCast_Copy_On_Write.cpp b/ace/RMCast/RMCast_Copy_On_Write.cpp
new file mode 100644
index 00000000000..f1553c7f4ab
--- /dev/null
+++ b/ace/RMCast/RMCast_Copy_On_Write.cpp
@@ -0,0 +1,176 @@
+// $Id$
+
+#ifndef ACE_RMCAST_COPY_ON_WRITE_CPP
+#define ACE_RMCAST_COPY_ON_WRITE_CPP
+
+#include "RMCast_Copy_On_Write.h"
+
+#if ! defined (__ACE_INLINE__)
+#include "RMCast_Copy_On_Write.i"
+#endif /* __ACE_INLINE__ */
+
+ACE_RCSID(RMCast, RMCast_Copy_On_Write, "$Id$")
+
+template<class COLLECTION, class ITERATOR> void
+ACE_RMCast_Copy_On_Write_Collection<COLLECTION,ITERATOR>::_incr_refcnt (void)
+{
+ // LOCKING: no locking is required, the caller grabs the mutex.
+ this->refcount_++;
+}
+
+template<class COLLECTION, class ITERATOR> void
+ACE_RMCast_Copy_On_Write_Collection<COLLECTION,ITERATOR>::_decr_refcnt (void)
+{
+ // LOCKING: no locking is required, the caller grabs the mutex.
+ {
+ this->refcount_--;
+ if (this->refcount_ != 0)
+ return;
+ }
+ //@@ TODO: If this wrapper is going to be completely general some
+ // kind of functor has to be provided to remove the elements in the
+ // collection, in case the are no self-managed
+
+ delete this;
+}
+
+// ****************************************************************
+
+template<class KEY, class ITEM, class COLLECTION, class ITERATOR>
+ACE_RMCast_Copy_On_Write<KEY,ITEM,COLLECTION,ITERATOR>::
+ ACE_RMCast_Copy_On_Write (void)
+ : pending_writes_ (0)
+ , writing_ (0)
+ , cond_ (mutex_)
+{
+ ACE_NEW (this->collection_, Collection);
+}
+
+template<class KEY, class ITEM, class COLLECTION, class ITERATOR>
+ACE_RMCast_Copy_On_Write<KEY,ITEM,COLLECTION,ITERATOR>::
+ ~ACE_RMCast_Copy_On_Write (void)
+{
+ ACE_GUARD (ACE_SYNCH_MUTEX, ace_mon, this->mutex_);
+
+ while (this->pending_writes_ != 0)
+ this->cond_.wait ();
+
+ this->collection_->_decr_refcnt ();
+ this->collection_ = 0;
+}
+
+template<class KEY, class ITEM, class COLLECTION, class ITERATOR> int
+ACE_RMCast_Copy_On_Write<KEY,ITEM,COLLECTION,ITERATOR>::
+ for_each (ACE_RMCast_Worker<KEY,ITEM> *worker)
+{
+ Read_Guard ace_mon (this->mutex_, this->collection_);
+
+ ITERATOR end = ace_mon.collection->collection.end ();
+ for (ITERATOR i = ace_mon.collection->collection.begin (); i != end; ++i)
+ {
+ int r = worker->work ((*i).key (), (*i).item ());
+ if (r != 0)
+ return r;
+ }
+ return 0;
+}
+
+template<class KEY, class ITEM, class C, class I> int
+ACE_RMCast_Copy_On_Write<KEY,ITEM,C,I>::bind (KEY const & k,
+ ITEM const & i)
+{
+ Write_Guard ace_mon (this->mutex_,
+ this->cond_,
+ this->pending_writes_,
+ this->writing_,
+ this->collection_);
+
+ return this->bind_i (ace_mon, k, i);
+}
+
+template<class KEY, class ITEM, class C, class I> int
+ACE_RMCast_Copy_On_Write<KEY,ITEM,C,I>::unbind (KEY const & k)
+{
+ Write_Guard ace_mon (this->mutex_,
+ this->cond_,
+ this->pending_writes_,
+ this->writing_,
+ this->collection_);
+
+ return this->unbind_i (ace_mon, k);
+}
+
+template<class KEY, class ITEM, class C, class I> int
+ACE_RMCast_Copy_On_Write<KEY,ITEM,C,I>::bind_i (Write_Guard &ace_mon,
+ KEY const & k,
+ ITEM const & i)
+{
+ return ace_mon.copy->collection.bind (k, i);
+}
+
+template<class KEY, class ITEM, class C, class I> int
+ACE_RMCast_Copy_On_Write<KEY,ITEM,C,I>::unbind_i (Write_Guard &ace_mon,
+ KEY const & k)
+{
+ return ace_mon.copy->collection.unbind (k);
+}
+
+// ****************************************************************
+
+template<class COLLECTION, class ITERATOR>
+ACE_RMCast_Copy_On_Write_Write_Guard<COLLECTION,ITERATOR>::
+ ACE_RMCast_Copy_On_Write_Write_Guard (ACE_SYNCH_MUTEX &m,
+ ACE_SYNCH_CONDITION &c,
+ int &p,
+ int &w,
+ Collection*& cr)
+ : copy (0)
+ , mutex (m)
+ , cond (c)
+ , pending_writes (p)
+ , writing_flag (w)
+ , collection (cr)
+{
+ {
+ ACE_GUARD (ACE_SYNCH_MUTEX, ace_mon, this->mutex);
+
+ this->pending_writes++;
+
+ while (this->writing_flag != 0)
+ this->cond.wait ();
+
+ this->writing_flag = 1;
+ }
+
+ // Copy outside the mutex, because it may take a long time.
+ // Nobody can change it, because it is protected by the
+ // writing_flag.
+
+ // First initialize it (with the correct reference count
+ ACE_NEW (this->copy, Collection);
+ // Copy the contents
+ this->copy->collection = this->collection->collection;
+}
+
+template<class COLLECTION, class ITERATOR>
+ACE_RMCast_Copy_On_Write_Write_Guard<COLLECTION,ITERATOR>::
+ ~ACE_RMCast_Copy_On_Write_Write_Guard (void)
+{
+ Collection *tmp = 0;
+ {
+ ACE_GUARD (ACE_SYNCH_MUTEX, ace_mon, this->mutex);
+
+ tmp = this->collection;
+ this->collection = this->copy;
+ this->writing_flag = 0;
+ this->pending_writes--;
+
+ this->cond.signal ();
+ }
+ // Delete outside the mutex, because it may take a long time.
+ tmp->_decr_refcnt ();
+}
+
+// ****************************************************************
+
+#endif /* ACE_RMCAST_COPY_ON_WRITE_CPP */
diff --git a/ace/RMCast/RMCast_Copy_On_Write.h b/ace/RMCast/RMCast_Copy_On_Write.h
new file mode 100644
index 00000000000..8724e23a5d5
--- /dev/null
+++ b/ace/RMCast/RMCast_Copy_On_Write.h
@@ -0,0 +1,173 @@
+/* -*- C++ -*- */
+// $Id$
+//
+
+#ifndef ACE_RMCAST_COPY_ON_WRITE_H
+#define ACE_RMCAST_COPY_ON_WRITE_H
+#include "ace/pre.h"
+
+#include "RMCast_Worker.h"
+#include "ace/Synch.h"
+
+//! A wrapper to implement reference counted collections
+template<class COLLECTION, class ITERATOR>
+class ACE_RMCast_Copy_On_Write_Collection
+{
+public:
+ //! Constructor
+ ACE_RMCast_Copy_On_Write_Collection (void);
+
+ //! Increment the reference count
+ void _incr_refcnt (void);
+
+ //! Decrement the reference count
+ void _decr_refcnt (void);
+
+ //! The actual collection
+ COLLECTION collection;
+
+private:
+ //! The reference count
+ ACE_UINT32 refcount_;
+};
+
+// ****************************************************************
+
+//! Implement a read guard for a reference counted collection
+template<class COLLECTION, class ITERATOR>
+class ACE_RMCast_Copy_On_Write_Read_Guard
+{
+public:
+ typedef ACE_RMCast_Copy_On_Write_Collection<COLLECTION,ITERATOR> Collection;
+
+ //! Constructor
+ ACE_RMCast_Copy_On_Write_Read_Guard (ACE_SYNCH_MUTEX &mutex,
+ Collection *&collection);
+
+ //! Destructor
+ ~ACE_RMCast_Copy_On_Write_Read_Guard (void);
+
+ //! A reference to the collection
+ Collection *collection;
+
+private:
+ //! Synchronization
+ ACE_SYNCH_MUTEX &mutex_;
+};
+
+// ****************************************************************
+
+//! Implement the write guard for a reference counted collecion
+/*!
+ * This helper class atomically increments the reference count of a
+ * ACE_RMCast_Copy_On_Write_Collection and reads the current
+ * collection in the Copy_On_Write class.
+ */
+template<class COLLECTION, class ITERATOR>
+class ACE_RMCast_Copy_On_Write_Write_Guard
+{
+public:
+ typedef ACE_RMCast_Copy_On_Write_Collection<COLLECTION,ITERATOR> Collection;
+
+ //! Constructor
+ ACE_RMCast_Copy_On_Write_Write_Guard (ACE_SYNCH_MUTEX &mutex,
+ ACE_SYNCH_CONDITION &cond,
+ int &pending_writes,
+ int &writing_flag,
+ Collection*& collection);
+
+ //! Destructor
+ ~ACE_RMCast_Copy_On_Write_Write_Guard (void);
+
+ //! The collection
+ Collection *copy;
+
+private:
+ //! Keep a reference to the mutex
+ ACE_SYNCH_MUTEX &mutex;
+
+ //! Keep a reference to the condition variable
+ ACE_SYNCH_CONDITION &cond;
+
+ //! Use a reference to update the pending writes count
+ int &pending_writes;
+
+ //! Use a reference to update the writing flag
+ int &writing_flag;
+
+ //! Use this reference to update the collection once the
+ //! modifications are finished.
+ Collection *&collection;
+};
+
+// ****************************************************************
+
+//! Implement a copy on write wrapper for a map-like collection
+template<class KEY, class ITEM, class COLLECTION, class ITERATOR>
+class ACE_RMCast_Copy_On_Write
+{
+public:
+ //! The Read_Guard trait
+ typedef ACE_RMCast_Copy_On_Write_Read_Guard<COLLECTION,ITERATOR> Read_Guard;
+
+ //! The Write_Guard trait
+ typedef ACE_RMCast_Copy_On_Write_Write_Guard<COLLECTION,ITERATOR> Write_Guard;
+
+ //! The underlying collection type
+ typedef ACE_RMCast_Copy_On_Write_Collection<COLLECTION,ITERATOR> Collection;
+
+ //! Constructor
+ ACE_RMCast_Copy_On_Write (void);
+
+ //! Destructor
+ ~ACE_RMCast_Copy_On_Write (void);
+
+ //! Iterate over all the elements invoking \param worker on each one.
+ int for_each (ACE_RMCast_Worker<KEY,ITEM> *worker);
+
+ //! Add a new element
+ int bind (KEY const & key, ITEM const & item);
+
+ //! Remove an element
+ int unbind (KEY const & key);
+
+ //! Bind assuming the Write_Guard is held
+ int bind_i (Write_Guard &guard, KEY const & key, ITEM const & item);
+
+ //! Unbind assuming the Write_Guard is held
+ int unbind_i (Write_Guard &guard, KEY const & key);
+
+ //! Number of pending writes
+ int pending_writes_;
+
+ //! If non-zero then a thread is changing the collection.
+ /*!
+ * Many threads can use the collection simulatenously, but only one
+ * change it.
+ */
+ int writing_;
+
+ //! A mutex to serialize access to the collection pointer.
+ ACE_SYNCH_MUTEX mutex_;
+
+ //! A condition variable to wait to synchronize multiple writers.
+ ACE_SYNCH_CONDITION cond_;
+
+ //! The collection, with reference counting added
+ Collection *collection_;
+};
+
+#if defined (__ACE_INLINE__)
+#include "RMCast_Copy_On_Write.i"
+#endif /* __ACE_INLINE__ */
+
+#if defined (ACE_TEMPLATES_REQUIRE_SOURCE)
+#include "RMCast_Copy_On_Write.cpp"
+#endif /* ACE_TEMPLATES_REQUIRE_SOURCE */
+
+#if defined (ACE_TEMPLATES_REQUIRE_PRAGMA)
+#pragma implementation ("RMCast_Copy_On_Write.cpp")
+#endif /* ACE_TEMPLATES_REQUIRE_PRAGMA */
+
+#include "ace/post.h"
+#endif /* ACE_RMCAST_COPY_ON_WRITE_H */
diff --git a/ace/RMCast/RMCast_Copy_On_Write.i b/ace/RMCast/RMCast_Copy_On_Write.i
new file mode 100644
index 00000000000..c6e5099cda5
--- /dev/null
+++ b/ace/RMCast/RMCast_Copy_On_Write.i
@@ -0,0 +1,36 @@
+// $Id$
+
+template<class COLLECTION, class ITERATOR> ACE_INLINE
+ACE_RMCast_Copy_On_Write_Collection<COLLECTION,ITERATOR>::
+ ACE_RMCast_Copy_On_Write_Collection (void)
+ : refcount_ (1)
+{
+}
+
+// ****************************************************************
+
+template<class COLLECTION, class ITERATOR> ACE_INLINE
+ACE_RMCast_Copy_On_Write_Read_Guard<COLLECTION,ITERATOR>::
+ ACE_RMCast_Copy_On_Write_Read_Guard (ACE_SYNCH_MUTEX &m,
+ Collection*& collection_ref)
+ : collection (0)
+ , mutex_ (m)
+{
+ ACE_GUARD (ACE_SYNCH_MUTEX, ace_mon, this->mutex_);
+ this->collection = collection_ref;
+ this->collection->_incr_refcnt ();
+}
+
+template<class COLLECTION, class ITERATOR> ACE_INLINE
+ACE_RMCast_Copy_On_Write_Read_Guard<COLLECTION,ITERATOR>::
+ ~ACE_RMCast_Copy_On_Write_Read_Guard (void)
+{
+ if (this->collection != 0)
+ {
+ ACE_GUARD (ACE_SYNCH_MUTEX, ace_mon, this->mutex_);
+ this->collection->_decr_refcnt ();
+ }
+}
+
+// ****************************************************************
+
diff --git a/ace/RMCast/RMCast_Fragment.h b/ace/RMCast/RMCast_Fragment.h
index 7b64d763ebc..eed08c92517 100644
--- a/ace/RMCast/RMCast_Fragment.h
+++ b/ace/RMCast/RMCast_Fragment.h
@@ -1,15 +1,5 @@
// $Id$
-// ============================================================================
-//
-// = DESCRIPTION
-// The fragmentation task for the reliable multicast library
-//
-// = AUTHOR
-// Carlos O'Ryan <coryan@uci.edu>
-//
-// ============================================================================
-
#ifndef ACE_RMCAST_FRAGMENT_H
#define ACE_RMCAST_FRAGMENT_H
#include "ace/pre.h"
@@ -21,28 +11,46 @@
# pragma once
#endif /* ACE_LACKS_PRAGMA_ONCE */
+//! Default fragment size
#ifndef ACE_RMCAST_DEFAULT_FRAGMENT_SIZE
# define ACE_RMCAST_DEFAULT_FRAGMENT_SIZE 1024
#endif /* ACE_RMCAST_DEFAULT_FRAGMENT_SIZE */
+//! Fragmentation module
+/*!
+ * Some transports cannot send very big messages, for example UDP
+ * imposes a limit of 64K, and in practice the limit is even more
+ * strict than that.
+ * This class decomposes a message into multiple fragments, using an
+ * application defined maximum size.
+ */
class ACE_RMCast_Export ACE_RMCast_Fragment : public ACE_RMCast_Module
{
public:
+ //! Constructor
ACE_RMCast_Fragment (void);
- // Constructor
+ //! Destructor
virtual ~ACE_RMCast_Fragment (void);
- // Destructor
+ //! Accessor for the max_fragment size.
+ /*! There is no modifier, the maximum fragment size is obtained
+ * using feedback from the lower layers (transport?)
+ * @@TODO We have not implemented the feedback mechanisms yet!
+ */
size_t max_fragment_size (void) const;
- // Accessor for the max_fragment size.
- // There is no modifier, the maximum fragment size is obtained using
- // feedback from the lower layer (transport?)
- // = The ACE_RMCast_Module methods
+ /*!
+ * Only data messages need fragmentation, the control messages are
+ * all small enough for all the transports that I know about.
+ * Well, actually for CAN-Bus (Controller Area Network), they may be
+ * too big, because the max payload there is 8 bytes, but we don't
+ * play with those in ACE.
+ */
virtual int data (ACE_RMCast::Data &data);
private:
+ //! Current fragment size limit
size_t max_fragment_size_;
};
diff --git a/ace/RMCast/RMCast_IO_UDP.cpp b/ace/RMCast/RMCast_IO_UDP.cpp
index af655f3130f..421982d5ad6 100644
--- a/ace/RMCast/RMCast_IO_UDP.cpp
+++ b/ace/RMCast/RMCast_IO_UDP.cpp
@@ -354,7 +354,7 @@ ACE_RMCast_IO_UDP::send_ack (ACE_RMCast::Ack &ack,
char header[16];
header[0] = ACE_RMCast::MT_ACK;
- ACE_UINT32 tmp = ACE_HTONL (ack.highest_in_sequence);
+ ACE_UINT32 tmp = ACE_HTONL (ack.next_expected);
ACE_OS::memcpy (header + 1,
&tmp, sizeof(ACE_UINT32));
tmp = ACE_HTONL (ack.highest_received);
diff --git a/ace/RMCast/RMCast_IO_UDP.h b/ace/RMCast/RMCast_IO_UDP.h
index bdcccabe6e1..5af403bf994 100644
--- a/ace/RMCast/RMCast_IO_UDP.h
+++ b/ace/RMCast/RMCast_IO_UDP.h
@@ -33,44 +33,65 @@ class ACE_Time_Value;
class ACE_RMCast_Export ACE_RMCast_IO_UDP : public ACE_RMCast_Module
{
public:
+ //! Constructor
+ /*!
+ * The <factory> argument is used to create the modules for each
+ * proxy that process incoming messages. The class does *not* assume
+ * ownership of <factory>, the caller owns it. But it does assume
+ * ownership of the modules returned by the factory, and it may ask
+ * the factory to release them eventually.
+ */
ACE_RMCast_IO_UDP (ACE_RMCast_Module_Factory *factory);
- // Constructor
- // <factory> is used to create the modules for each proxy that
- // process incoming messages. The class does *not* assume ownership
- // of <factory>, the caller owns it.
+ //! Destructor
~ACE_RMCast_IO_UDP (void);
- // Destructor
+ //! Join a new multicast group
+ /*!
+ * Start receiving data for the <mcast_addr> multicast group.
+ * Please read the documentation of ACE_SOCK_Dgram_Mcast for more
+ * details.
+ */
int subscribe (const ACE_INET_Addr &mcast_addr,
int reuse_addr = 1,
const ACE_TCHAR *net_if = 0,
int protocol_family = PF_INET,
int protocol = 0);
- // Start receiving data for the <mcast_addr> multicast group.
- // Please read the documentation of <ACE_SOCK_Dgram_Mcast> for more
- // details.
// The class can be used with a Reactor or using blocking I/O
// depending on what method of the following two is called.
+ //! Wait for events for the period <tv>. If <tv> is zero it blocks
+ //! forever.
int handle_events (ACE_Time_Value *tv = 0);
- // Wait for events for the period <tv>. If <tv> is zero it blocks
- // forever.
+ //! Register any event handlers into <reactor>
+ /*!
+ * @@TODO: This should be left for the clients of the class, there
+ * is no reason why this class must know about reactors.
+ */
int register_handlers (ACE_Reactor *reactor);
- // Register any event handlers into <reactor>
+ //! Remove all the handlers from the reactor
+ /*!
+ * @@TODO: This should be left for the clients of the class, there
+ * is no reason why this class must know about reactors.
+ */
int remove_handlers (void);
- // Remove all the handlers from the reactor
+ //! There is data to read, read it and process it.
int handle_input (ACE_HANDLE h);
- // There is data to read, read it and process it.
+ //! Obtain the handle for the underlying socket
ACE_HANDLE get_handle (void) const;
- // Obtain the handle for the underlying socket
- // Send back to the remove object represented by <proxy>
+ //@{
+ //! Send the message to the ACE_INET_Addr argument.
+ /*!
+ * These methods are used in the implementation of the
+ * ACE_RMCast_UDP_Proxy objects and the implementation of the
+ * inherited ACE_RMCast_Module methods in this class.
+ */
int send_data (ACE_RMCast::Data &, const ACE_INET_Addr &);
int send_poll (ACE_RMCast::Poll &, const ACE_INET_Addr &);
int send_ack_join (ACE_RMCast::Ack_Join &, const ACE_INET_Addr &);
@@ -78,8 +99,9 @@ public:
int send_ack (ACE_RMCast::Ack &, const ACE_INET_Addr &);
int send_join (ACE_RMCast::Join &, const ACE_INET_Addr &);
int send_leave (ACE_RMCast::Leave &, const ACE_INET_Addr &);
+ //@}
- // = The RMCast_Module methods
+ // Please read the documentation in ACE_RMCast_Module for more details
virtual int data (ACE_RMCast::Data &);
virtual int poll (ACE_RMCast::Poll &);
virtual int ack_join (ACE_RMCast::Ack_Join &);
@@ -87,23 +109,24 @@ public:
virtual int ack (ACE_RMCast::Ack &);
virtual int join (ACE_RMCast::Join &);
virtual int leave (ACE_RMCast::Leave &);
- // The messages are sent to the multicast group
private:
+ //! The factory used to create the modules attached to each proxy
ACE_RMCast_Module_Factory *factory_;
- // The factory used to create the modules attached to each proxy
+ //! The multicast group we subscribe and send to
ACE_INET_Addr mcast_group_;
- // The multicast group we subscribe and send to
+ //! The socket used to receive and send data
ACE_SOCK_Dgram_Mcast dgram_;
- // The socket
+ //! Use a Hash_Map to maintain the collection of proxies
typedef ACE_Hash_Map_Manager<ACE_INET_Addr,ACE_RMCast_UDP_Proxy*,ACE_Null_Mutex> Map;
+ //! The collection of proxies
Map map_;
+ //! The event handler adapter
ACE_RMCast_UDP_Event_Handler eh_;
- // The event handler adapter
};
#if defined (__ACE_INLINE__)
diff --git a/ace/RMCast/RMCast_Membership.cpp b/ace/RMCast/RMCast_Membership.cpp
index 6ee2690a41f..a23d7a756e5 100644
--- a/ace/RMCast/RMCast_Membership.cpp
+++ b/ace/RMCast/RMCast_Membership.cpp
@@ -28,14 +28,14 @@ ACE_RMCast_Membership::ack (ACE_RMCast::Ack &ack)
ACE_RMCast::Ack next_ack;
{
ACE_GUARD_RETURN (ACE_SYNCH_MUTEX, ace_mon, this->mutex_, -1);
- if (ack.highest_in_sequence < this->highest_in_sequence_)
+ if (ack.next_expected < this->next_expected_)
{
// @@ This violates an invariant of the class, shouldn't
// happen...
// ACE_DEBUG ((LM_DEBUG, "ACE_RMCast_Membership::ack[3]\n"));
return -1;
}
- else if (ack.highest_in_sequence == this->highest_in_sequence_)
+ else if (ack.next_expected == this->next_expected_)
{
// Nothing new, just continue....
// ACE_DEBUG ((LM_DEBUG, "ACE_RMCast_Membership::ack[4]\n"));
@@ -43,21 +43,23 @@ ACE_RMCast_Membership::ack (ACE_RMCast::Ack &ack)
}
// Possible update, re-evaluate the story...
- ACE_UINT32 highest_in_sequence = (*i)->highest_in_sequence ();
+ ACE_UINT32 next_expected = (*i)->next_expected ();
ACE_UINT32 highest_received = (*i)->highest_received ();
++i;
for (; i != end; ++i)
{
- ACE_UINT32 s = (*i)->highest_in_sequence ();
- if (s < highest_in_sequence)
- highest_in_sequence = s;
+ ACE_UINT32 s = (*i)->next_expected ();
+ if (s < next_expected)
+ next_expected = s;
ACE_UINT32 r = (*i)->highest_received ();
if (r > highest_received)
highest_received = r;
}
#if 0
- if (this->highest_in_sequence_ >= highest_in_sequence
+ // @@TODO: this is an important feature, disabled until it is
+ // fully debugged
+ if (this->next_expected_ >= next_expected
|| this->highest_received_ >= highest_received)
{
// No change....
@@ -65,12 +67,12 @@ ACE_RMCast_Membership::ack (ACE_RMCast::Ack &ack)
return 0;
}
#endif /* 0 */
- this->highest_in_sequence_ = highest_in_sequence;
+ this->next_expected_ = next_expected;
this->highest_received_ = highest_received;
if (this->next () == 0)
return 0;
next_ack.source = ack.source;
- next_ack.highest_in_sequence = this->highest_in_sequence_;
+ next_ack.next_expected = this->next_expected_;
next_ack.highest_received = this->highest_received_;
}
// @@ This looks like a race condition, next() is checked inside the
@@ -89,6 +91,8 @@ ACE_RMCast_Membership::join (ACE_RMCast::Join &join)
ACE_GUARD_RETURN (ACE_SYNCH_MUTEX, ace_mon, this->mutex_, -1);
if (this->proxies_.insert (join.source) == -1)
return -1;
+ // @@TODO: This may change the next Ack to send up, should
+ // recompute and send the right message if that was the case.
}
return this->ACE_RMCast_Module::join (join);
@@ -103,6 +107,8 @@ ACE_RMCast_Membership::leave (ACE_RMCast::Leave &leave)
{
ACE_GUARD_RETURN (ACE_SYNCH_MUTEX, ace_mon, this->mutex_, -1);
(void) this->proxies_.remove (leave.source);
+ // @@TODO: This may change the next Ack to send up, should
+ // recompute and send the right message if that was the case.
}
return this->ACE_RMCast_Module::leave (leave);
diff --git a/ace/RMCast/RMCast_Membership.h b/ace/RMCast/RMCast_Membership.h
index a99a7752507..21ee1bea97b 100644
--- a/ace/RMCast/RMCast_Membership.h
+++ b/ace/RMCast/RMCast_Membership.h
@@ -28,41 +28,59 @@
class ACE_RMCast_Proxy;
+//! Track peer membership
+/*!
+ * Reliable senders of events need to know exactly how many peers are
+ * receiving the events, and how many events has each peer received so
+ * far.
+ * This class uses the Join, Leave and Ack messages to build that
+ * information, it also summarizes the Ack events and propagate only
+ * the global info to the upper layer.
+ */
class ACE_RMCast_Export ACE_RMCast_Membership : public ACE_RMCast_Module
{
- // = TITLE
- // Track Receiver membership
- //
- // = DESCRIPTION
- // Define the interface for all reliable multicast membership
public:
- // = Initialization and termination methods.
+ //! Constructor
ACE_RMCast_Membership (void);
- // Constructor
+ //! Destructor
virtual ~ACE_RMCast_Membership (void);
- // Destructor
- // = The RMCast_Module methods
+ //! Receive an process an Ack message
+ /*!
+ * After receiving the Ack message we find out what is the lowest
+ * sequence number received in order among all the acks received by
+ * the proxies in the collection. We also find out what is the
+ * highest sequence number received by any proxy.
+ * We only propagate that information back to the upper layer, and
+ * then only if there are any news since the last Ack.
+ */
virtual int ack (ACE_RMCast::Ack &);
+
+ //! Add a new member to the collection, using the <source> field in
+ //! the Join message
virtual int join (ACE_RMCast::Join &);
+
+ //! Remove a member from the collection, using the <source> field in
+ //! the Join message
virtual int leave (ACE_RMCast::Leave &);
protected:
+ //! Use an unbounded set to maintain the collection of proxies.
typedef ACE_Unbounded_Set<ACE_RMCast_Proxy*> Proxy_Collection;
typedef ACE_Unbounded_Set_Iterator<ACE_RMCast_Proxy*> Proxy_Iterator;
+ //! The collection of proxies
Proxy_Collection proxies_;
- // The membership buffer
- ACE_UINT32 highest_in_sequence_;
- // The smallest value of <highest_in_sequence> for all the proxies
+ //! The smallest value of \param next_expected for all the proxies
+ ACE_UINT32 next_expected_;
+ //! The highest value of \param highest_received for all the proxies
ACE_UINT32 highest_received_;
- // The highest value of <highest_received> for all the proxies
+ //! Synchronization
ACE_SYNCH_MUTEX mutex_;
- // Synchronization
};
#if defined (__ACE_INLINE__)
diff --git a/ace/RMCast/RMCast_Membership.i b/ace/RMCast/RMCast_Membership.i
index 0c3e33c2d01..b513c2d5141 100644
--- a/ace/RMCast/RMCast_Membership.i
+++ b/ace/RMCast/RMCast_Membership.i
@@ -2,7 +2,7 @@
ACE_INLINE
ACE_RMCast_Membership::ACE_RMCast_Membership (void)
- : highest_in_sequence_ (0)
+ : next_expected_ (0)
, highest_received_ (0)
{
}
diff --git a/ace/RMCast/RMCast_Module.h b/ace/RMCast/RMCast_Module.h
index dc4077fa4ab..fad76caac53 100644
--- a/ace/RMCast/RMCast_Module.h
+++ b/ace/RMCast/RMCast_Module.h
@@ -36,55 +36,55 @@ class ACE_Time_Value;
class ACE_RMCast_Export ACE_RMCast_Module
{
public:
- // = Initialization and termination methods.
+ //! Constructor
ACE_RMCast_Module (void);
- //!< Constructor
+ //! Destructor
virtual ~ACE_RMCast_Module (void);
- //!< Destructor
+ //! Modifier for the next element in the stack
virtual int next (ACE_RMCast_Module *next);
- //!< Modifier for the next element in the stack
+ //! Accesor for the next element in the stack
virtual ACE_RMCast_Module* next (void) const;
- //!< Accesor for the next element in the stack
+ //! Modifier for the previous element in the stack
virtual int prev (ACE_RMCast_Module *prev);
- //!< Modifier for the previous element in the stack
+ //! Accesor for the previous element in the stack
virtual ACE_RMCast_Module* prev (void) const;
- //!< Accesor for the previous element in the stack
+ //! Initialize the module, setting up the next module
virtual int open (void);
- //!< Initialize the module, setting up the next module
+ //! Close the module.
virtual int close (void);
- //!< Close the module.
+ //! Push data through the stack
virtual int data (ACE_RMCast::Data &);
- //!< Push data through the stack
+ //! Push a polling request through the stack
virtual int poll (ACE_RMCast::Poll &);
- //!< Push a polling request through the stack
+ //! Push a message to ack a join request through the stack
virtual int ack_join (ACE_RMCast::Ack_Join &);
- //!< Push a message to ack a join request through the stack
+ //! Push a message to ack a leave request through the stack
virtual int ack_leave (ACE_RMCast::Ack_Leave &);
- //!< Push a message to ack a leave request through the stack
+ //! Push an ack mesage through the stack
virtual int ack (ACE_RMCast::Ack &);
- //!< Push an ack mesage through the stack
+ //! Push a join message through the stack
virtual int join (ACE_RMCast::Join &);
- //!< Push a join message through the stack
+ //! Push a leave message through the stack
virtual int leave (ACE_RMCast::Leave &);
- //!< Push a leave message through the stack
private:
//! The next element in the stack
ACE_RMCast_Module *next_;
+
//! The previous element in the stack
ACE_RMCast_Module *prev_;
};
diff --git a/ace/RMCast/RMCast_Module_Factory.h b/ace/RMCast/RMCast_Module_Factory.h
index 722ad87d678..f0ea58df0e5 100644
--- a/ace/RMCast/RMCast_Module_Factory.h
+++ b/ace/RMCast/RMCast_Module_Factory.h
@@ -27,19 +27,40 @@
class ACE_RMCast_Module;
class ACE_RMCast_IO_UDP;
+//! Create Module stacks
+/*!
+ * Different application will probably require different
+ * configurations in their Module stack, some will just want best
+ * effort semantics. Others will use Reliable communication with a
+ * maximum retransmission time. Furthermore, applications may want to
+ * receive messages in send order, or just as soon as they are
+ * received.
+ * Obviously most applications will want to change want happens once a
+ * message is completely received.
+ *
+ * To achieve all this flexibility the IO layer uses this factory to
+ * create the full stack of Modules corresponding to a single
+ * consumer.
+ * To keep the complexity under control the intention is to create
+ * helper Factories, such as Reliable_Module_Factory where
+ * applications only need to customize a few features.
+ */
class ACE_RMCast_Export ACE_RMCast_Module_Factory
{
- // = DESCRIPTION
- //
public:
+ //! Destructor
virtual ~ACE_RMCast_Module_Factory (void);
- // Destructor
+ //! Create a new proxy
virtual ACE_RMCast_Module *create (ACE_RMCast_IO_UDP *) = 0;
- // Create a new proxy
+ //! Destroy a proxy
+ /*!
+ * Some factories may allocate modules from a pool, or return the
+ * same module for all proxies. Consequently, only the factory
+ * knows how to destroy them.
+ */
virtual void destroy (ACE_RMCast_Module *) = 0;
- // Destroy a proxy
};
#if defined (__ACE_INLINE__)
diff --git a/ace/RMCast/RMCast_Partial_Message.h b/ace/RMCast/RMCast_Partial_Message.h
index 9b71eb4a541..88fa9ab2f1a 100644
--- a/ace/RMCast/RMCast_Partial_Message.h
+++ b/ace/RMCast/RMCast_Partial_Message.h
@@ -26,44 +26,72 @@
#define ACE_RMCAST_DEFAULT_HOLE_COUNT 16
#endif /* ACE_RMCAST_DEFAULT_HOLE_COUNT */
+//! Represent a partially received message in the
+//! ACE_RMCast_Reassembly module
+/*!
+ * This class provides temporary storage for the fragments as they are
+ * received in the ACE_RMCast_Reassembly module. It also keeps track
+ * of what portions of the message are still missing.
+ */
class ACE_RMCast_Export ACE_RMCast_Partial_Message
{
public:
+ //! Constructor, reserve enough memory for the complete message
ACE_RMCast_Partial_Message (ACE_UINT32 message_size);
+
+ //! Destructor
~ACE_RMCast_Partial_Message (void);
+ //! Process a fragment
+ /*!
+ * A fragment starting at <offset> has been received, copy the
+ * fragment contents and update the list of holes.
+ */
int fragment_received (ACE_UINT32 message_size,
ACE_UINT32 offset,
ACE_Message_Block *mb);
+
+ //! Return 1 if the message is complete
int is_complete (void) const;
+ //! Return the body of the message, the memory is *not* owned by the
+ //! caller
ACE_Message_Block *message_body (void);
- // Return the body of the message, the memory is owned by the
- // class.
private:
+ //! Insert a new hole into the list
+ /*!
+ * The class keeps an array to represent the missing portions of the
+ * message. This method inserts a new hole, i.e. a new element in
+ * the array at index <i>. The <start> and <end> arguments represent
+ * the offsets of the missing portion of the message.
+ */
int insert_hole (size_t i,
ACE_UINT32 start,
ACE_UINT32 end);
- // Insert a new hole into the list
+ //! Remove a hole from the list
int remove_hole (size_t i);
- // Remove a hole from the list
private:
+ //! Maintain the message storage
ACE_Message_Block message_body_;
- // Used to rebuild the body of the message
+ //! Represent a missing portion of a message
struct Hole
{
+ //! Offset where the missing portion of the message starts
ACE_UINT32 start;
+ //! Offset where the missing portion of the message ends
ACE_UINT32 end;
};
+ //! Implement a growing array of Hole structures
+ //@{
Hole *hole_list_;
size_t max_hole_count_;
size_t hole_count_;
- // The current list of holes in the message_body.
+ //@}
};
#if defined (__ACE_INLINE__)
diff --git a/ace/RMCast/RMCast_Proxy.cpp b/ace/RMCast/RMCast_Proxy.cpp
index 53d9d0b6726..f6b2bbec5e5 100644
--- a/ace/RMCast/RMCast_Proxy.cpp
+++ b/ace/RMCast/RMCast_Proxy.cpp
@@ -15,9 +15,9 @@ ACE_RMCast_Proxy::~ACE_RMCast_Proxy (void)
}
ACE_UINT32
-ACE_RMCast_Proxy::highest_in_sequence (void) const
+ACE_RMCast_Proxy::next_expected (void) const
{
- return this->highest_in_sequence_;
+ return this->next_expected_;
}
ACE_UINT32
@@ -29,7 +29,7 @@ ACE_RMCast_Proxy::highest_received (void) const
int
ACE_RMCast_Proxy::ack (ACE_RMCast::Ack &ack)
{
- this->highest_in_sequence_ = ack.highest_in_sequence;
+ this->next_expected_ = ack.next_expected;
this->highest_received_ = ack.highest_received;
return this->ACE_RMCast_Module::ack (ack);
}
diff --git a/ace/RMCast/RMCast_Proxy.h b/ace/RMCast/RMCast_Proxy.h
index 414b74174fb..e0e6afe79b1 100644
--- a/ace/RMCast/RMCast_Proxy.h
+++ b/ace/RMCast/RMCast_Proxy.h
@@ -48,27 +48,28 @@ public:
//! Destructor
virtual ~ACE_RMCast_Proxy (void);
-
- //! Return the highest sequence number received without any losses
- //! before it. Only applies to remote receiver proxies.
+
+ //! Return the next sequence number expected by the peer. Only
+ //! applies to remote receiver proxies.
/*!
- Please read the documentation in ACE_RMCast::Ack
+ * Please read the documentation in ACE_RMCast::Ack
*/
- virtual ACE_UINT32 highest_in_sequence (void) const;
+ virtual ACE_UINT32 next_expected (void) const;
//! Return the highest sequence number successfully received.
//! Only applies to remote receiver proxies.
/*!
- Please read the documentation in ACE_RMCast::Ack
+ * Please read the documentation in ACE_RMCast::Ack
*/
virtual ACE_UINT32 highest_received (void) const;
//@{
//! Send messages directly to the peer.
- /*! Send a message directly to the peer, i.e. the message is not
- sent through the multicast group and it may not be processed by
- all the layers in the stack.
- */
+ /*!
+ * Send a message directly to the peer, i.e. the message is not
+ * sent through the multicast group and it may not be processed by
+ * all the layers in the stack.
+ */
virtual int reply_data (ACE_RMCast::Data &) = 0;
virtual int reply_poll (ACE_RMCast::Poll &) = 0;
virtual int reply_ack_join (ACE_RMCast::Ack_Join &) = 0;
@@ -79,8 +80,8 @@ public:
//@}
/*!
- Proxies process the ACK sequence numbers to save the sequence
- numbers reported from the remote peer.
+ * Proxies process the ACK sequence numbers to cache the ack
+ * information from the peer.
*/
virtual int ack (ACE_RMCast::Ack &);
@@ -88,7 +89,7 @@ private:
//@{
//! Cache the sequence numbers reported from the remote peer using
//! Ack messages
- ACE_UINT32 highest_in_sequence_;
+ ACE_UINT32 next_expected_;
ACE_UINT32 highest_received_;
//@}
};
diff --git a/ace/RMCast/RMCast_Proxy.i b/ace/RMCast/RMCast_Proxy.i
index f93feaa5639..6fee09fe9e5 100644
--- a/ace/RMCast/RMCast_Proxy.i
+++ b/ace/RMCast/RMCast_Proxy.i
@@ -2,7 +2,7 @@
ACE_INLINE
ACE_RMCast_Proxy::ACE_RMCast_Proxy (void)
- : highest_in_sequence_ (0)
+ : next_expected_ (0)
, highest_received_ (0)
{
}
diff --git a/ace/RMCast/RMCast_Retransmission.cpp b/ace/RMCast/RMCast_Retransmission.cpp
index a996e1204d5..7e38cdf7c97 100644
--- a/ace/RMCast/RMCast_Retransmission.cpp
+++ b/ace/RMCast/RMCast_Retransmission.cpp
@@ -16,18 +16,60 @@ ACE_RMCast_Retransmission::~ACE_RMCast_Retransmission (void)
{
}
+class ACE_RMCast_Resend_Worker
+ : public ACE_RMCast_Worker<ACE_UINT32,ACE_RMCast::Data>
+{
+public:
+ ACE_RMCast_Resend_Worker (ACE_RMCast_Module *next,
+ ACE_UINT32 max_sequence_number)
+ : n (0)
+ , next_ (next)
+ , max_sequence_number_ (max_sequence_number)
+ {
+ }
+
+ int work (ACE_UINT32 const & key,
+ ACE_RMCast::Data const & item)
+ {
+ if (key > this->max_sequence_number_)
+ return 0;
+ ACE_DEBUG ((LM_DEBUG,
+ " Retransmission::resend - message %d resent\n",
+ key));
+ ACE_RMCast::Data data = item;
+ int r = this->next_->data (data);
+ if (r != 0)
+ return r;
+ n++;
+ return 0;
+ }
+
+ int n;
+
+private:
+ ACE_RMCast_Module *next_;
+
+ ACE_UINT32 max_sequence_number_;
+};
+
int
-ACE_RMCast_Retransmission::close (void)
+ACE_RMCast_Retransmission::resend (ACE_UINT32 max_sequence_number)
{
- Messages_Iterator end = this->messages_.end ();
+ if (this->next () == 0)
+ return 0;
- for (Messages_Iterator i = this->messages_.begin ();
- i != end;
- ++i)
- {
- ACE_Message_Block::release ((*i).item ().payload);
- }
- this->messages_.close ();
+ ACE_RMCast_Resend_Worker worker (this->next (), max_sequence_number);
+
+ if (this->messages_.for_each (&worker) == -1)
+ return -1;
+
+ return worker.n;
+}
+
+int
+ACE_RMCast_Retransmission::close (void)
+{
+ // @@
return 0;
}
@@ -40,7 +82,6 @@ ACE_RMCast_Retransmission::data (ACE_RMCast::Data &data)
int r = this->next ()->data (data);
if (r == 0)
{
- ACE_GUARD_RETURN (ACE_SYNCH_MUTEX, ace_mon, this->mutex_, -1);
ACE_RMCast::Data copy = data;
copy.payload = ACE_Message_Block::duplicate (data.payload);
r = this->messages_.bind (data.sequence_number, copy);
@@ -55,8 +96,9 @@ ACE_RMCast_Retransmission::join (ACE_RMCast::Join &join)
return 0;
ACE_RMCast::Ack_Join ack_join;
+#if 0
+ // @@
{
- ACE_GUARD_RETURN (ACE_SYNCH_MUTEX, ace_mon, this->mutex_, -1);
Messages_Iterator end = this->messages_.end ();
Messages_Iterator begin = this->messages_.begin ();
@@ -70,6 +112,7 @@ ACE_RMCast_Retransmission::join (ACE_RMCast::Join &join)
ack_join.next_sequence_number = (*begin).key ();
}
}
+#endif
(void) join.source->reply_ack_join (ack_join);
// @@ We should force a full retransmission of all the messages!
@@ -77,20 +120,54 @@ ACE_RMCast_Retransmission::join (ACE_RMCast::Join &join)
return 0;
}
+class ACE_RMCast_Ack_Worker
+ : public ACE_RMCast_Worker<ACE_UINT32,ACE_RMCast::Data>
+{
+public:
+ ACE_RMCast_Ack_Worker (ACE_RMCast::Ack &ack,
+ ACE_RMCast_Retransmission::Messages::Write_Guard &g,
+ ACE_RMCast_Retransmission::Messages *messages)
+ : ack_ (ack)
+ , ace_mon_ (g)
+ , messages_ (messages)
+ {
+ }
+
+ int work (ACE_UINT32 const & key,
+ ACE_RMCast::Data const &)
+ {
+ if (key >= this->ack_.next_expected)
+ return 0;
+ ACE_DEBUG ((LM_DEBUG,
+ " Retransmission::ack - message %d erased\n",
+ key));
+ return this->messages_->unbind_i (this->ace_mon_, key);
+ }
+
+private:
+ ACE_RMCast_Ack_Worker (const ACE_RMCast_Ack_Worker&);
+ ACE_RMCast_Ack_Worker& operator= (const ACE_RMCast_Ack_Worker&);
+
+private:
+ ACE_RMCast::Ack &ack_;
+
+ ACE_RMCast_Retransmission::Messages::Write_Guard &ace_mon_;
+
+ ACE_RMCast_Retransmission::Messages *messages_;
+};
+
int
ACE_RMCast_Retransmission::ack (ACE_RMCast::Ack &ack)
{
- ACE_GUARD_RETURN (ACE_SYNCH_MUTEX, ace_mon, this->mutex_, -1);
- for (Messages_Iterator i = this->messages_.begin ();
- i != this->messages_.end ();
- /* do nothing */)
- {
- ACE_UINT32 key = (*i).key ();
- if (key > ack.highest_in_sequence)
- break;
- this->messages_.unbind (key);
- }
- return 0;
+ Messages::Write_Guard ace_mon (this->messages_.mutex_,
+ this->messages_.cond_,
+ this->messages_.pending_writes_,
+ this->messages_.writing_,
+ this->messages_.collection_);
+
+ ACE_RMCast_Ack_Worker worker (ack, ace_mon, &this->messages_);
+
+ return this->messages_.for_each (&worker);
}
#if defined (ACE_HAS_EXPLICIT_TEMPLATE_INSTANTIATION)
@@ -101,4 +178,10 @@ template class ACE_RB_Tree_Iterator<ACE_UINT32,ACE_RMCast::Data,ACE_Less_Than<AC
template class ACE_RB_Tree_Reverse_Iterator<ACE_UINT32,ACE_RMCast::Data,ACE_Less_Than<ACE_UINT32>,ACE_Null_Mutex>;
template class ACE_RB_Tree_Node<ACE_UINT32,ACE_RMCast::Data>;
+template class ACE_RMCast_Copy_On_Write<ACE_UINT32,ACE_RMCast::Data,ACE_RMCast_Retransmission::Collection,ACE_RMCast_Retransmission::Collection_Iterator>;
+template class ACE_RMCast_Copy_On_Write_Write_Guard<ACE_RMCast_Retransmission::Collection,ACE_RMCast_Retransmission::Collection_Iterator>;
+template class ACE_RMCast_Copy_On_Write_Read_Guard<ACE_RMCast_Retransmission::Collection,ACE_RMCast_Retransmission::Collection_Iterator>;
+template class ACE_RMCast_Copy_On_Write_Collection<ACE_RMCast_Retransmission::Collection,ACE_RMCast_Retransmission::Collection_Iterator>;
+template class ACE_RMCast_Worker<ACE_UINT32,ACE_RMCast::Data>;
+
#endif /* ACE_HAS_EXPLICIT_TEMPLATE_INSTANTIATION */
diff --git a/ace/RMCast/RMCast_Retransmission.h b/ace/RMCast/RMCast_Retransmission.h
index 7c586fe5dd6..b7bc20d2914 100644
--- a/ace/RMCast/RMCast_Retransmission.h
+++ b/ace/RMCast/RMCast_Retransmission.h
@@ -19,6 +19,7 @@
#include "ace/pre.h"
#include "RMCast_Module.h"
+#include "RMCast_Copy_On_Write.h"
#include "ace/RB_Tree.h"
#include "ace/Synch.h"
@@ -26,38 +27,72 @@
# pragma once
#endif /* ACE_LACKS_PRAGMA_ONCE */
+//! Store messages for retransmission in reliable configurations
+/*!
+ * Reliable configurations of the RMCast framework need to store
+ * messages on the sender side to resend them if one or more clients
+ * do not receive them successfully.
+ */
class ACE_RMCast_Export ACE_RMCast_Retransmission : public ACE_RMCast_Module
{
- // = TITLE
- // Reliable Multicast Retransmission
- //
- // = DESCRIPTION
- // Define the interface for all reliable multicast retransmission
public:
// = Initialization and termination methods.
+ //! Constructor
ACE_RMCast_Retransmission (void);
- // Constructor
+ //! Destructor
virtual ~ACE_RMCast_Retransmission (void);
- // Destructor
- // = The RMCast_Module methods
+ //! Use a Red-Black Tree to keep the queue of messages
+ typedef ACE_RB_Tree<ACE_UINT32,ACE_RMCast::Data,ACE_Less_Than<ACE_UINT32>,ACE_Null_Mutex> Collection;
+ typedef ACE_RB_Tree_Iterator<ACE_UINT32,ACE_RMCast::Data,ACE_Less_Than<ACE_UINT32>,ACE_Null_Mutex> Collection_Iterator;
+
+ //! The messages are stored in the Copy_On_Write wrapper to provide
+ //! an efficient, but thread safe interface.
+ typedef ACE_RMCast_Copy_On_Write<ACE_UINT32,ACE_RMCast::Data,Collection,Collection_Iterator> Messages;
+
+ //! Resend messages
+ /*!
+ * Resends all the messages up to \param max_sequence_number
+ * Returns the number of messages sent, or -1 if there where any
+ * errors.
+ */
+ int resend (ACE_UINT32 max_sequence_number);
+
+ //! Cleanup all the stored messages
virtual int close (void);
+
+ //! Pass the message downstream, but also save it in the
+ //! retransmission queue
+ /*!
+ * Sequence number are assigned by the ACE_RMCast_Fragmentation
+ * class, consequently this class first passes the message
+ * downstream, to obtain the sequence number and then stores the
+ * message for later retransmission.
+ */
virtual int data (ACE_RMCast::Data &data);
+
+ //! Process an Ack message from the remote receivers.
+ /*!
+ * Normally this Ack message will be a summary of all the Ack
+ * messages received by the ACE_RMCast_Membership class
+ */
virtual int ack (ACE_RMCast::Ack &);
+
+ //! Detect when new members join the group and Ack_Join them
+ /*!
+ * When a new receiver joins the group this module sends an Ack_Join
+ * message with the next sequence number that the receiver should
+ * expect.
+ * The sequence number is obtained from the current list of cached
+ * messages.
+ */
virtual int join (ACE_RMCast::Join &);
protected:
- typedef ACE_RB_Tree<ACE_UINT32,ACE_RMCast::Data,ACE_Less_Than<ACE_UINT32>,ACE_Null_Mutex>
- Messages;
- typedef ACE_RB_Tree_Iterator<ACE_UINT32,ACE_RMCast::Data,ACE_Less_Than<ACE_UINT32>,ACE_Null_Mutex>
- Messages_Iterator;
+ //! The retransmission buffer
Messages messages_;
- // The retransmission buffer
-
- ACE_SYNCH_MUTEX mutex_;
- // Synchronization
};
#if defined (__ACE_INLINE__)
diff --git a/ace/RMCast/RMCast_UDP_Event_Handler.h b/ace/RMCast/RMCast_UDP_Event_Handler.h
index 02798cee7f8..2a6e7c45d42 100644
--- a/ace/RMCast/RMCast_UDP_Event_Handler.h
+++ b/ace/RMCast/RMCast_UDP_Event_Handler.h
@@ -1,16 +1,5 @@
// $Id$
-// ============================================================================
-//
-// = DESCRIPTION
-// Implement an adapter between the ACE Reactor and the
-// ACE_RMCast_IO_UDP
-//
-// = AUTHOR
-// Carlos O'Ryan <coryan@uci.edu>
-//
-// ============================================================================
-
#ifndef ACE_RMCAST_UDP_EVENT_HANDLER_H
#define ACE_RMCAST_UDP_EVENT_HANDLER_H
#include "ace/pre.h"
@@ -25,24 +14,41 @@
class ACE_RMCast_IO_UDP;
class ACE_INET_Addr;
+//! Implement an Adapter for the ACE_RMCast_IO_UDP class
+/*!
+ * Applications may wish to use the ACE_Reactor to demultiplex I/O
+ * events for an ACE_RMCast_IO_UDP object. However other application
+ * may choose to make ACE_RMCast_IO_UDP active, or they may dedicate
+ * their own threads for its events.
+ * To avoid couplin ACE_RMCast_IO_UDP with the Reactor we don't make
+ * it derived from ACE_Event_Handler or any other class in the Reactor
+ * framework, instead, this simple Adapter can forward the Reactor
+ * messages to an ACE_RMCast_IO_UDP object.
+ */
class ACE_RMCast_Export ACE_RMCast_UDP_Event_Handler : public ACE_Event_Handler
{
public:
- ACE_RMCast_UDP_Event_Handler (ACE_RMCast_IO_UDP *receiver);
- // Constructor
-
+ //! Constructor, save io_udp as the Adaptee in the Adapter pattern.
+ ACE_RMCast_UDP_Event_Handler (ACE_RMCast_IO_UDP *io_udp);
+
+ //! Destructor
+ /*!
+ * Notice that this class does not own the ACE_RMCast_IO_UDP
+ * adaptee, so it does not destroy it.
+ */
~ACE_RMCast_UDP_Event_Handler (void);
- // Destructor
- // = The Event_Handler methods
+ //@{
+ //! Documented in ACE_Event_Handler class
virtual ACE_HANDLE get_handle (void) const;
virtual int handle_input (ACE_HANDLE fd = ACE_INVALID_HANDLE);
virtual int handle_timeout (const ACE_Time_Value &current_time,
const void *act = 0);
+ //@}
private:
+ //! The adaptee
ACE_RMCast_IO_UDP *io_udp_;
- // The sender
};
#if defined (__ACE_INLINE__)
diff --git a/ace/RMCast/RMCast_UDP_Proxy.cpp b/ace/RMCast/RMCast_UDP_Proxy.cpp
index 010267b0cbb..2eb0983b171 100644
--- a/ace/RMCast/RMCast_UDP_Proxy.cpp
+++ b/ace/RMCast/RMCast_UDP_Proxy.cpp
@@ -123,7 +123,7 @@ ACE_RMCast_UDP_Proxy::receive_message (char *buffer, size_t size)
ACE_OS::memcpy (&tmp, buffer + 1,
sizeof(tmp));
- ack.highest_in_sequence = ACE_NTOHL (tmp);
+ ack.next_expected = ACE_NTOHL (tmp);
ACE_OS::memcpy (&tmp, buffer + 1 + sizeof(ACE_UINT32),
sizeof(tmp));
ack.highest_received = ACE_NTOHL (tmp);
@@ -175,4 +175,3 @@ ACE_RMCast_UDP_Proxy::reply_leave (ACE_RMCast::Leave &leave)
{
return this->io_udp_->send_leave (leave, this->peer_addr_);
}
-
diff --git a/ace/RMCast/RMCast_Worker.cpp b/ace/RMCast/RMCast_Worker.cpp
new file mode 100644
index 00000000000..06254b8c0f6
--- /dev/null
+++ b/ace/RMCast/RMCast_Worker.cpp
@@ -0,0 +1,19 @@
+// $Id$
+
+#ifndef ACE_RMCAST_WORKER_CPP
+#define ACE_RMCAST_WORKER_CPP
+
+#include "RMCast_Worker.h"
+
+#if ! defined (__ACE_INLINE__)
+#include "RMCast_Worker.i"
+#endif /* __ACE_INLINE__ */
+
+ACE_RCSID(RMCast, RMCast_Worker, "$Id$")
+
+template<class KEY, class ITEM>
+ACE_RMCast_Worker<KEY,ITEM>::~ACE_RMCast_Worker (void)
+{
+}
+
+#endif /* ACE_RMCAST_WORKER_CPP */
diff --git a/ace/RMCast/RMCast_Worker.h b/ace/RMCast/RMCast_Worker.h
new file mode 100644
index 00000000000..d3eb3032ebc
--- /dev/null
+++ b/ace/RMCast/RMCast_Worker.h
@@ -0,0 +1,36 @@
+/* -*- C++ -*- */
+// $Id$
+//
+
+#ifndef ACE_RMCAST_WORKER_H
+#define ACE_RMCAST_WORKER_H
+
+#include "ace/config-all.h"
+
+#if !defined (ACE_LACKS_PRAGMA_ONCE)
+# pragma once
+#endif /* ACE_LACKS_PRAGMA_ONCE */
+
+template<class KEY, class ITEM>
+class ACE_RMCast_Worker
+{
+public:
+ virtual ~ACE_RMCast_Worker (void);
+
+ virtual int work (KEY const & key,
+ ITEM const & item) = 0;
+};
+
+#if defined (__ACE_INLINE__)
+#include "RMCast_Worker.i"
+#endif /* __ACE_INLINE__ */
+
+#if defined (ACE_TEMPLATES_REQUIRE_SOURCE)
+#include "RMCast_Worker.cpp"
+#endif /* ACE_TEMPLATES_REQUIRE_SOURCE */
+
+#if defined (ACE_TEMPLATES_REQUIRE_PRAGMA)
+#pragma implementation ("RMCast_Worker.cpp")
+#endif /* ACE_TEMPLATES_REQUIRE_PRAGMA */
+
+#endif /* ACE_RMCAST_WORKER_H */
diff --git a/ace/RMCast/RMCast_Worker.i b/ace/RMCast/RMCast_Worker.i
new file mode 100644
index 00000000000..cfa1da318d3
--- /dev/null
+++ b/ace/RMCast/RMCast_Worker.i
@@ -0,0 +1 @@
+// $Id$
diff --git a/protocols/ace/RMCast/Makefile b/protocols/ace/RMCast/Makefile
index ae13792c4c4..e97cd885493 100644
--- a/protocols/ace/RMCast/Makefile
+++ b/protocols/ace/RMCast/Makefile
@@ -636,17 +636,11 @@ include $(ACE_ROOT)/include/makeinclude/rules.local.GNU
$(ACE_ROOT)/ace/Basic_Types.i \
$(ACE_ROOT)/ace/Trace.h \
$(ACE_ROOT)/ace/OS.i \
- RMCast_Export.h RMCast.i RMCast_Module.i \
- $(ACE_ROOT)/ace/RB_Tree.h \
- $(ACE_ROOT)/ace/Functor.h \
+ RMCast_Export.h RMCast.i RMCast_Module.i RMCast_Copy_On_Write.h \
+ RMCast_Worker.h RMCast_Worker.i RMCast_Worker.cpp \
+ $(ACE_ROOT)/ace/Synch.h \
$(ACE_ROOT)/ace/ACE.h \
$(ACE_ROOT)/ace/ACE.i \
- $(ACE_ROOT)/ace/Functor.i \
- $(ACE_ROOT)/ace/Functor_T.h \
- $(ACE_ROOT)/ace/Functor_T.i \
- $(ACE_ROOT)/ace/Functor_T.cpp \
- $(ACE_ROOT)/ace/RB_Tree.i \
- $(ACE_ROOT)/ace/Synch.h \
$(ACE_ROOT)/ace/Synch.i \
$(ACE_ROOT)/ace/Synch_T.h \
$(ACE_ROOT)/ace/Event_Handler.h \
@@ -664,6 +658,14 @@ include $(ACE_ROOT)/include/makeinclude/rules.local.GNU
$(ACE_ROOT)/ace/Log_Record.h \
$(ACE_ROOT)/ace/Log_Priority.h \
$(ACE_ROOT)/ace/Log_Record.i \
+ RMCast_Copy_On_Write.i RMCast_Copy_On_Write.cpp \
+ $(ACE_ROOT)/ace/RB_Tree.h \
+ $(ACE_ROOT)/ace/Functor.h \
+ $(ACE_ROOT)/ace/Functor.i \
+ $(ACE_ROOT)/ace/Functor_T.h \
+ $(ACE_ROOT)/ace/Functor_T.i \
+ $(ACE_ROOT)/ace/Functor_T.cpp \
+ $(ACE_ROOT)/ace/RB_Tree.i \
$(ACE_ROOT)/ace/Malloc.h \
$(ACE_ROOT)/ace/Malloc_Base.h \
$(ACE_ROOT)/ace/Based_Pointer_T.h \
diff --git a/protocols/ace/RMCast/RMCast.h b/protocols/ace/RMCast/RMCast.h
index abf2a24e946..df3a0d48858 100644
--- a/protocols/ace/RMCast/RMCast.h
+++ b/protocols/ace/RMCast/RMCast.h
@@ -233,16 +233,19 @@ public:
*
* This message is used to provide feedback information to senders.
* It contains two sequence numbers:
- * - highest_in_sequence: is the sequence number of the last message
- * received without any lost messages before it
- * - highest_received: is the sequence number of the last_message
- * successfully received, there may be some messages lost before it
+ * - \param next_expected: is the sequence number of the next message
+ * expected, i.e. (next_expected-1) is the last message received
+ * without any losses before it.
+ * - \param highest_received: is the highest sequence number among
+ * all the messages successfully received.
+ * In other words, all messages lost (if any) are in the range:
+ * [next_expected,highest_received)
*
* <CODE>
* +---------+----------------------+<BR>
* | 8 bits | MT_ACK |<BR>
* +---------+----------------------+<BR>
- * | 32 bits | highest_in_sequence |<BR>
+ * | 32 bits | next_expected |<BR>
* +---------+----------------------+<BR>
* | 32 bits | highest_received |<BR>
* +---------+----------------------+<BR>
@@ -251,7 +254,7 @@ public:
struct Ack
{
//! The last message received without any losses before it.
- ACE_UINT32 highest_in_sequence;
+ ACE_UINT32 next_expected;
//! The last message successfully received
ACE_UINT32 highest_received;
diff --git a/protocols/ace/RMCast/RMCast_Copy_On_Write.cpp b/protocols/ace/RMCast/RMCast_Copy_On_Write.cpp
new file mode 100644
index 00000000000..f1553c7f4ab
--- /dev/null
+++ b/protocols/ace/RMCast/RMCast_Copy_On_Write.cpp
@@ -0,0 +1,176 @@
+// $Id$
+
+#ifndef ACE_RMCAST_COPY_ON_WRITE_CPP
+#define ACE_RMCAST_COPY_ON_WRITE_CPP
+
+#include "RMCast_Copy_On_Write.h"
+
+#if ! defined (__ACE_INLINE__)
+#include "RMCast_Copy_On_Write.i"
+#endif /* __ACE_INLINE__ */
+
+ACE_RCSID(RMCast, RMCast_Copy_On_Write, "$Id$")
+
+template<class COLLECTION, class ITERATOR> void
+ACE_RMCast_Copy_On_Write_Collection<COLLECTION,ITERATOR>::_incr_refcnt (void)
+{
+ // LOCKING: no locking is required, the caller grabs the mutex.
+ this->refcount_++;
+}
+
+template<class COLLECTION, class ITERATOR> void
+ACE_RMCast_Copy_On_Write_Collection<COLLECTION,ITERATOR>::_decr_refcnt (void)
+{
+ // LOCKING: no locking is required, the caller grabs the mutex.
+ {
+ this->refcount_--;
+ if (this->refcount_ != 0)
+ return;
+ }
+ //@@ TODO: If this wrapper is going to be completely general some
+ // kind of functor has to be provided to remove the elements in the
+ // collection, in case the are no self-managed
+
+ delete this;
+}
+
+// ****************************************************************
+
+template<class KEY, class ITEM, class COLLECTION, class ITERATOR>
+ACE_RMCast_Copy_On_Write<KEY,ITEM,COLLECTION,ITERATOR>::
+ ACE_RMCast_Copy_On_Write (void)
+ : pending_writes_ (0)
+ , writing_ (0)
+ , cond_ (mutex_)
+{
+ ACE_NEW (this->collection_, Collection);
+}
+
+template<class KEY, class ITEM, class COLLECTION, class ITERATOR>
+ACE_RMCast_Copy_On_Write<KEY,ITEM,COLLECTION,ITERATOR>::
+ ~ACE_RMCast_Copy_On_Write (void)
+{
+ ACE_GUARD (ACE_SYNCH_MUTEX, ace_mon, this->mutex_);
+
+ while (this->pending_writes_ != 0)
+ this->cond_.wait ();
+
+ this->collection_->_decr_refcnt ();
+ this->collection_ = 0;
+}
+
+template<class KEY, class ITEM, class COLLECTION, class ITERATOR> int
+ACE_RMCast_Copy_On_Write<KEY,ITEM,COLLECTION,ITERATOR>::
+ for_each (ACE_RMCast_Worker<KEY,ITEM> *worker)
+{
+ Read_Guard ace_mon (this->mutex_, this->collection_);
+
+ ITERATOR end = ace_mon.collection->collection.end ();
+ for (ITERATOR i = ace_mon.collection->collection.begin (); i != end; ++i)
+ {
+ int r = worker->work ((*i).key (), (*i).item ());
+ if (r != 0)
+ return r;
+ }
+ return 0;
+}
+
+template<class KEY, class ITEM, class C, class I> int
+ACE_RMCast_Copy_On_Write<KEY,ITEM,C,I>::bind (KEY const & k,
+ ITEM const & i)
+{
+ Write_Guard ace_mon (this->mutex_,
+ this->cond_,
+ this->pending_writes_,
+ this->writing_,
+ this->collection_);
+
+ return this->bind_i (ace_mon, k, i);
+}
+
+template<class KEY, class ITEM, class C, class I> int
+ACE_RMCast_Copy_On_Write<KEY,ITEM,C,I>::unbind (KEY const & k)
+{
+ Write_Guard ace_mon (this->mutex_,
+ this->cond_,
+ this->pending_writes_,
+ this->writing_,
+ this->collection_);
+
+ return this->unbind_i (ace_mon, k);
+}
+
+template<class KEY, class ITEM, class C, class I> int
+ACE_RMCast_Copy_On_Write<KEY,ITEM,C,I>::bind_i (Write_Guard &ace_mon,
+ KEY const & k,
+ ITEM const & i)
+{
+ return ace_mon.copy->collection.bind (k, i);
+}
+
+template<class KEY, class ITEM, class C, class I> int
+ACE_RMCast_Copy_On_Write<KEY,ITEM,C,I>::unbind_i (Write_Guard &ace_mon,
+ KEY const & k)
+{
+ return ace_mon.copy->collection.unbind (k);
+}
+
+// ****************************************************************
+
+template<class COLLECTION, class ITERATOR>
+ACE_RMCast_Copy_On_Write_Write_Guard<COLLECTION,ITERATOR>::
+ ACE_RMCast_Copy_On_Write_Write_Guard (ACE_SYNCH_MUTEX &m,
+ ACE_SYNCH_CONDITION &c,
+ int &p,
+ int &w,
+ Collection*& cr)
+ : copy (0)
+ , mutex (m)
+ , cond (c)
+ , pending_writes (p)
+ , writing_flag (w)
+ , collection (cr)
+{
+ {
+ ACE_GUARD (ACE_SYNCH_MUTEX, ace_mon, this->mutex);
+
+ this->pending_writes++;
+
+ while (this->writing_flag != 0)
+ this->cond.wait ();
+
+ this->writing_flag = 1;
+ }
+
+ // Copy outside the mutex, because it may take a long time.
+ // Nobody can change it, because it is protected by the
+ // writing_flag.
+
+ // First initialize it (with the correct reference count
+ ACE_NEW (this->copy, Collection);
+ // Copy the contents
+ this->copy->collection = this->collection->collection;
+}
+
+template<class COLLECTION, class ITERATOR>
+ACE_RMCast_Copy_On_Write_Write_Guard<COLLECTION,ITERATOR>::
+ ~ACE_RMCast_Copy_On_Write_Write_Guard (void)
+{
+ Collection *tmp = 0;
+ {
+ ACE_GUARD (ACE_SYNCH_MUTEX, ace_mon, this->mutex);
+
+ tmp = this->collection;
+ this->collection = this->copy;
+ this->writing_flag = 0;
+ this->pending_writes--;
+
+ this->cond.signal ();
+ }
+ // Delete outside the mutex, because it may take a long time.
+ tmp->_decr_refcnt ();
+}
+
+// ****************************************************************
+
+#endif /* ACE_RMCAST_COPY_ON_WRITE_CPP */
diff --git a/protocols/ace/RMCast/RMCast_Copy_On_Write.h b/protocols/ace/RMCast/RMCast_Copy_On_Write.h
new file mode 100644
index 00000000000..8724e23a5d5
--- /dev/null
+++ b/protocols/ace/RMCast/RMCast_Copy_On_Write.h
@@ -0,0 +1,173 @@
+/* -*- C++ -*- */
+// $Id$
+//
+
+#ifndef ACE_RMCAST_COPY_ON_WRITE_H
+#define ACE_RMCAST_COPY_ON_WRITE_H
+#include "ace/pre.h"
+
+#include "RMCast_Worker.h"
+#include "ace/Synch.h"
+
+//! A wrapper to implement reference counted collections
+template<class COLLECTION, class ITERATOR>
+class ACE_RMCast_Copy_On_Write_Collection
+{
+public:
+ //! Constructor
+ ACE_RMCast_Copy_On_Write_Collection (void);
+
+ //! Increment the reference count
+ void _incr_refcnt (void);
+
+ //! Decrement the reference count
+ void _decr_refcnt (void);
+
+ //! The actual collection
+ COLLECTION collection;
+
+private:
+ //! The reference count
+ ACE_UINT32 refcount_;
+};
+
+// ****************************************************************
+
+//! Implement a read guard for a reference counted collection
+template<class COLLECTION, class ITERATOR>
+class ACE_RMCast_Copy_On_Write_Read_Guard
+{
+public:
+ typedef ACE_RMCast_Copy_On_Write_Collection<COLLECTION,ITERATOR> Collection;
+
+ //! Constructor
+ ACE_RMCast_Copy_On_Write_Read_Guard (ACE_SYNCH_MUTEX &mutex,
+ Collection *&collection);
+
+ //! Destructor
+ ~ACE_RMCast_Copy_On_Write_Read_Guard (void);
+
+ //! A reference to the collection
+ Collection *collection;
+
+private:
+ //! Synchronization
+ ACE_SYNCH_MUTEX &mutex_;
+};
+
+// ****************************************************************
+
+//! Implement the write guard for a reference counted collecion
+/*!
+ * This helper class atomically increments the reference count of a
+ * ACE_RMCast_Copy_On_Write_Collection and reads the current
+ * collection in the Copy_On_Write class.
+ */
+template<class COLLECTION, class ITERATOR>
+class ACE_RMCast_Copy_On_Write_Write_Guard
+{
+public:
+ typedef ACE_RMCast_Copy_On_Write_Collection<COLLECTION,ITERATOR> Collection;
+
+ //! Constructor
+ ACE_RMCast_Copy_On_Write_Write_Guard (ACE_SYNCH_MUTEX &mutex,
+ ACE_SYNCH_CONDITION &cond,
+ int &pending_writes,
+ int &writing_flag,
+ Collection*& collection);
+
+ //! Destructor
+ ~ACE_RMCast_Copy_On_Write_Write_Guard (void);
+
+ //! The collection
+ Collection *copy;
+
+private:
+ //! Keep a reference to the mutex
+ ACE_SYNCH_MUTEX &mutex;
+
+ //! Keep a reference to the condition variable
+ ACE_SYNCH_CONDITION &cond;
+
+ //! Use a reference to update the pending writes count
+ int &pending_writes;
+
+ //! Use a reference to update the writing flag
+ int &writing_flag;
+
+ //! Use this reference to update the collection once the
+ //! modifications are finished.
+ Collection *&collection;
+};
+
+// ****************************************************************
+
+//! Implement a copy on write wrapper for a map-like collection
+template<class KEY, class ITEM, class COLLECTION, class ITERATOR>
+class ACE_RMCast_Copy_On_Write
+{
+public:
+ //! The Read_Guard trait
+ typedef ACE_RMCast_Copy_On_Write_Read_Guard<COLLECTION,ITERATOR> Read_Guard;
+
+ //! The Write_Guard trait
+ typedef ACE_RMCast_Copy_On_Write_Write_Guard<COLLECTION,ITERATOR> Write_Guard;
+
+ //! The underlying collection type
+ typedef ACE_RMCast_Copy_On_Write_Collection<COLLECTION,ITERATOR> Collection;
+
+ //! Constructor
+ ACE_RMCast_Copy_On_Write (void);
+
+ //! Destructor
+ ~ACE_RMCast_Copy_On_Write (void);
+
+ //! Iterate over all the elements invoking \param worker on each one.
+ int for_each (ACE_RMCast_Worker<KEY,ITEM> *worker);
+
+ //! Add a new element
+ int bind (KEY const & key, ITEM const & item);
+
+ //! Remove an element
+ int unbind (KEY const & key);
+
+ //! Bind assuming the Write_Guard is held
+ int bind_i (Write_Guard &guard, KEY const & key, ITEM const & item);
+
+ //! Unbind assuming the Write_Guard is held
+ int unbind_i (Write_Guard &guard, KEY const & key);
+
+ //! Number of pending writes
+ int pending_writes_;
+
+ //! If non-zero then a thread is changing the collection.
+ /*!
+ * Many threads can use the collection simulatenously, but only one
+ * change it.
+ */
+ int writing_;
+
+ //! A mutex to serialize access to the collection pointer.
+ ACE_SYNCH_MUTEX mutex_;
+
+ //! A condition variable to wait to synchronize multiple writers.
+ ACE_SYNCH_CONDITION cond_;
+
+ //! The collection, with reference counting added
+ Collection *collection_;
+};
+
+#if defined (__ACE_INLINE__)
+#include "RMCast_Copy_On_Write.i"
+#endif /* __ACE_INLINE__ */
+
+#if defined (ACE_TEMPLATES_REQUIRE_SOURCE)
+#include "RMCast_Copy_On_Write.cpp"
+#endif /* ACE_TEMPLATES_REQUIRE_SOURCE */
+
+#if defined (ACE_TEMPLATES_REQUIRE_PRAGMA)
+#pragma implementation ("RMCast_Copy_On_Write.cpp")
+#endif /* ACE_TEMPLATES_REQUIRE_PRAGMA */
+
+#include "ace/post.h"
+#endif /* ACE_RMCAST_COPY_ON_WRITE_H */
diff --git a/protocols/ace/RMCast/RMCast_Copy_On_Write.i b/protocols/ace/RMCast/RMCast_Copy_On_Write.i
new file mode 100644
index 00000000000..c6e5099cda5
--- /dev/null
+++ b/protocols/ace/RMCast/RMCast_Copy_On_Write.i
@@ -0,0 +1,36 @@
+// $Id$
+
+template<class COLLECTION, class ITERATOR> ACE_INLINE
+ACE_RMCast_Copy_On_Write_Collection<COLLECTION,ITERATOR>::
+ ACE_RMCast_Copy_On_Write_Collection (void)
+ : refcount_ (1)
+{
+}
+
+// ****************************************************************
+
+template<class COLLECTION, class ITERATOR> ACE_INLINE
+ACE_RMCast_Copy_On_Write_Read_Guard<COLLECTION,ITERATOR>::
+ ACE_RMCast_Copy_On_Write_Read_Guard (ACE_SYNCH_MUTEX &m,
+ Collection*& collection_ref)
+ : collection (0)
+ , mutex_ (m)
+{
+ ACE_GUARD (ACE_SYNCH_MUTEX, ace_mon, this->mutex_);
+ this->collection = collection_ref;
+ this->collection->_incr_refcnt ();
+}
+
+template<class COLLECTION, class ITERATOR> ACE_INLINE
+ACE_RMCast_Copy_On_Write_Read_Guard<COLLECTION,ITERATOR>::
+ ~ACE_RMCast_Copy_On_Write_Read_Guard (void)
+{
+ if (this->collection != 0)
+ {
+ ACE_GUARD (ACE_SYNCH_MUTEX, ace_mon, this->mutex_);
+ this->collection->_decr_refcnt ();
+ }
+}
+
+// ****************************************************************
+
diff --git a/protocols/ace/RMCast/RMCast_Fragment.h b/protocols/ace/RMCast/RMCast_Fragment.h
index 7b64d763ebc..eed08c92517 100644
--- a/protocols/ace/RMCast/RMCast_Fragment.h
+++ b/protocols/ace/RMCast/RMCast_Fragment.h
@@ -1,15 +1,5 @@
// $Id$
-// ============================================================================
-//
-// = DESCRIPTION
-// The fragmentation task for the reliable multicast library
-//
-// = AUTHOR
-// Carlos O'Ryan <coryan@uci.edu>
-//
-// ============================================================================
-
#ifndef ACE_RMCAST_FRAGMENT_H
#define ACE_RMCAST_FRAGMENT_H
#include "ace/pre.h"
@@ -21,28 +11,46 @@
# pragma once
#endif /* ACE_LACKS_PRAGMA_ONCE */
+//! Default fragment size
#ifndef ACE_RMCAST_DEFAULT_FRAGMENT_SIZE
# define ACE_RMCAST_DEFAULT_FRAGMENT_SIZE 1024
#endif /* ACE_RMCAST_DEFAULT_FRAGMENT_SIZE */
+//! Fragmentation module
+/*!
+ * Some transports cannot send very big messages, for example UDP
+ * imposes a limit of 64K, and in practice the limit is even more
+ * strict than that.
+ * This class decomposes a message into multiple fragments, using an
+ * application defined maximum size.
+ */
class ACE_RMCast_Export ACE_RMCast_Fragment : public ACE_RMCast_Module
{
public:
+ //! Constructor
ACE_RMCast_Fragment (void);
- // Constructor
+ //! Destructor
virtual ~ACE_RMCast_Fragment (void);
- // Destructor
+ //! Accessor for the max_fragment size.
+ /*! There is no modifier, the maximum fragment size is obtained
+ * using feedback from the lower layers (transport?)
+ * @@TODO We have not implemented the feedback mechanisms yet!
+ */
size_t max_fragment_size (void) const;
- // Accessor for the max_fragment size.
- // There is no modifier, the maximum fragment size is obtained using
- // feedback from the lower layer (transport?)
- // = The ACE_RMCast_Module methods
+ /*!
+ * Only data messages need fragmentation, the control messages are
+ * all small enough for all the transports that I know about.
+ * Well, actually for CAN-Bus (Controller Area Network), they may be
+ * too big, because the max payload there is 8 bytes, but we don't
+ * play with those in ACE.
+ */
virtual int data (ACE_RMCast::Data &data);
private:
+ //! Current fragment size limit
size_t max_fragment_size_;
};
diff --git a/protocols/ace/RMCast/RMCast_IO_UDP.cpp b/protocols/ace/RMCast/RMCast_IO_UDP.cpp
index af655f3130f..421982d5ad6 100644
--- a/protocols/ace/RMCast/RMCast_IO_UDP.cpp
+++ b/protocols/ace/RMCast/RMCast_IO_UDP.cpp
@@ -354,7 +354,7 @@ ACE_RMCast_IO_UDP::send_ack (ACE_RMCast::Ack &ack,
char header[16];
header[0] = ACE_RMCast::MT_ACK;
- ACE_UINT32 tmp = ACE_HTONL (ack.highest_in_sequence);
+ ACE_UINT32 tmp = ACE_HTONL (ack.next_expected);
ACE_OS::memcpy (header + 1,
&tmp, sizeof(ACE_UINT32));
tmp = ACE_HTONL (ack.highest_received);
diff --git a/protocols/ace/RMCast/RMCast_IO_UDP.h b/protocols/ace/RMCast/RMCast_IO_UDP.h
index bdcccabe6e1..5af403bf994 100644
--- a/protocols/ace/RMCast/RMCast_IO_UDP.h
+++ b/protocols/ace/RMCast/RMCast_IO_UDP.h
@@ -33,44 +33,65 @@ class ACE_Time_Value;
class ACE_RMCast_Export ACE_RMCast_IO_UDP : public ACE_RMCast_Module
{
public:
+ //! Constructor
+ /*!
+ * The <factory> argument is used to create the modules for each
+ * proxy that process incoming messages. The class does *not* assume
+ * ownership of <factory>, the caller owns it. But it does assume
+ * ownership of the modules returned by the factory, and it may ask
+ * the factory to release them eventually.
+ */
ACE_RMCast_IO_UDP (ACE_RMCast_Module_Factory *factory);
- // Constructor
- // <factory> is used to create the modules for each proxy that
- // process incoming messages. The class does *not* assume ownership
- // of <factory>, the caller owns it.
+ //! Destructor
~ACE_RMCast_IO_UDP (void);
- // Destructor
+ //! Join a new multicast group
+ /*!
+ * Start receiving data for the <mcast_addr> multicast group.
+ * Please read the documentation of ACE_SOCK_Dgram_Mcast for more
+ * details.
+ */
int subscribe (const ACE_INET_Addr &mcast_addr,
int reuse_addr = 1,
const ACE_TCHAR *net_if = 0,
int protocol_family = PF_INET,
int protocol = 0);
- // Start receiving data for the <mcast_addr> multicast group.
- // Please read the documentation of <ACE_SOCK_Dgram_Mcast> for more
- // details.
// The class can be used with a Reactor or using blocking I/O
// depending on what method of the following two is called.
+ //! Wait for events for the period <tv>. If <tv> is zero it blocks
+ //! forever.
int handle_events (ACE_Time_Value *tv = 0);
- // Wait for events for the period <tv>. If <tv> is zero it blocks
- // forever.
+ //! Register any event handlers into <reactor>
+ /*!
+ * @@TODO: This should be left for the clients of the class, there
+ * is no reason why this class must know about reactors.
+ */
int register_handlers (ACE_Reactor *reactor);
- // Register any event handlers into <reactor>
+ //! Remove all the handlers from the reactor
+ /*!
+ * @@TODO: This should be left for the clients of the class, there
+ * is no reason why this class must know about reactors.
+ */
int remove_handlers (void);
- // Remove all the handlers from the reactor
+ //! There is data to read, read it and process it.
int handle_input (ACE_HANDLE h);
- // There is data to read, read it and process it.
+ //! Obtain the handle for the underlying socket
ACE_HANDLE get_handle (void) const;
- // Obtain the handle for the underlying socket
- // Send back to the remove object represented by <proxy>
+ //@{
+ //! Send the message to the ACE_INET_Addr argument.
+ /*!
+ * These methods are used in the implementation of the
+ * ACE_RMCast_UDP_Proxy objects and the implementation of the
+ * inherited ACE_RMCast_Module methods in this class.
+ */
int send_data (ACE_RMCast::Data &, const ACE_INET_Addr &);
int send_poll (ACE_RMCast::Poll &, const ACE_INET_Addr &);
int send_ack_join (ACE_RMCast::Ack_Join &, const ACE_INET_Addr &);
@@ -78,8 +99,9 @@ public:
int send_ack (ACE_RMCast::Ack &, const ACE_INET_Addr &);
int send_join (ACE_RMCast::Join &, const ACE_INET_Addr &);
int send_leave (ACE_RMCast::Leave &, const ACE_INET_Addr &);
+ //@}
- // = The RMCast_Module methods
+ // Please read the documentation in ACE_RMCast_Module for more details
virtual int data (ACE_RMCast::Data &);
virtual int poll (ACE_RMCast::Poll &);
virtual int ack_join (ACE_RMCast::Ack_Join &);
@@ -87,23 +109,24 @@ public:
virtual int ack (ACE_RMCast::Ack &);
virtual int join (ACE_RMCast::Join &);
virtual int leave (ACE_RMCast::Leave &);
- // The messages are sent to the multicast group
private:
+ //! The factory used to create the modules attached to each proxy
ACE_RMCast_Module_Factory *factory_;
- // The factory used to create the modules attached to each proxy
+ //! The multicast group we subscribe and send to
ACE_INET_Addr mcast_group_;
- // The multicast group we subscribe and send to
+ //! The socket used to receive and send data
ACE_SOCK_Dgram_Mcast dgram_;
- // The socket
+ //! Use a Hash_Map to maintain the collection of proxies
typedef ACE_Hash_Map_Manager<ACE_INET_Addr,ACE_RMCast_UDP_Proxy*,ACE_Null_Mutex> Map;
+ //! The collection of proxies
Map map_;
+ //! The event handler adapter
ACE_RMCast_UDP_Event_Handler eh_;
- // The event handler adapter
};
#if defined (__ACE_INLINE__)
diff --git a/protocols/ace/RMCast/RMCast_Membership.cpp b/protocols/ace/RMCast/RMCast_Membership.cpp
index 6ee2690a41f..a23d7a756e5 100644
--- a/protocols/ace/RMCast/RMCast_Membership.cpp
+++ b/protocols/ace/RMCast/RMCast_Membership.cpp
@@ -28,14 +28,14 @@ ACE_RMCast_Membership::ack (ACE_RMCast::Ack &ack)
ACE_RMCast::Ack next_ack;
{
ACE_GUARD_RETURN (ACE_SYNCH_MUTEX, ace_mon, this->mutex_, -1);
- if (ack.highest_in_sequence < this->highest_in_sequence_)
+ if (ack.next_expected < this->next_expected_)
{
// @@ This violates an invariant of the class, shouldn't
// happen...
// ACE_DEBUG ((LM_DEBUG, "ACE_RMCast_Membership::ack[3]\n"));
return -1;
}
- else if (ack.highest_in_sequence == this->highest_in_sequence_)
+ else if (ack.next_expected == this->next_expected_)
{
// Nothing new, just continue....
// ACE_DEBUG ((LM_DEBUG, "ACE_RMCast_Membership::ack[4]\n"));
@@ -43,21 +43,23 @@ ACE_RMCast_Membership::ack (ACE_RMCast::Ack &ack)
}
// Possible update, re-evaluate the story...
- ACE_UINT32 highest_in_sequence = (*i)->highest_in_sequence ();
+ ACE_UINT32 next_expected = (*i)->next_expected ();
ACE_UINT32 highest_received = (*i)->highest_received ();
++i;
for (; i != end; ++i)
{
- ACE_UINT32 s = (*i)->highest_in_sequence ();
- if (s < highest_in_sequence)
- highest_in_sequence = s;
+ ACE_UINT32 s = (*i)->next_expected ();
+ if (s < next_expected)
+ next_expected = s;
ACE_UINT32 r = (*i)->highest_received ();
if (r > highest_received)
highest_received = r;
}
#if 0
- if (this->highest_in_sequence_ >= highest_in_sequence
+ // @@TODO: this is an important feature, disabled until it is
+ // fully debugged
+ if (this->next_expected_ >= next_expected
|| this->highest_received_ >= highest_received)
{
// No change....
@@ -65,12 +67,12 @@ ACE_RMCast_Membership::ack (ACE_RMCast::Ack &ack)
return 0;
}
#endif /* 0 */
- this->highest_in_sequence_ = highest_in_sequence;
+ this->next_expected_ = next_expected;
this->highest_received_ = highest_received;
if (this->next () == 0)
return 0;
next_ack.source = ack.source;
- next_ack.highest_in_sequence = this->highest_in_sequence_;
+ next_ack.next_expected = this->next_expected_;
next_ack.highest_received = this->highest_received_;
}
// @@ This looks like a race condition, next() is checked inside the
@@ -89,6 +91,8 @@ ACE_RMCast_Membership::join (ACE_RMCast::Join &join)
ACE_GUARD_RETURN (ACE_SYNCH_MUTEX, ace_mon, this->mutex_, -1);
if (this->proxies_.insert (join.source) == -1)
return -1;
+ // @@TODO: This may change the next Ack to send up, should
+ // recompute and send the right message if that was the case.
}
return this->ACE_RMCast_Module::join (join);
@@ -103,6 +107,8 @@ ACE_RMCast_Membership::leave (ACE_RMCast::Leave &leave)
{
ACE_GUARD_RETURN (ACE_SYNCH_MUTEX, ace_mon, this->mutex_, -1);
(void) this->proxies_.remove (leave.source);
+ // @@TODO: This may change the next Ack to send up, should
+ // recompute and send the right message if that was the case.
}
return this->ACE_RMCast_Module::leave (leave);
diff --git a/protocols/ace/RMCast/RMCast_Membership.h b/protocols/ace/RMCast/RMCast_Membership.h
index a99a7752507..21ee1bea97b 100644
--- a/protocols/ace/RMCast/RMCast_Membership.h
+++ b/protocols/ace/RMCast/RMCast_Membership.h
@@ -28,41 +28,59 @@
class ACE_RMCast_Proxy;
+//! Track peer membership
+/*!
+ * Reliable senders of events need to know exactly how many peers are
+ * receiving the events, and how many events has each peer received so
+ * far.
+ * This class uses the Join, Leave and Ack messages to build that
+ * information, it also summarizes the Ack events and propagate only
+ * the global info to the upper layer.
+ */
class ACE_RMCast_Export ACE_RMCast_Membership : public ACE_RMCast_Module
{
- // = TITLE
- // Track Receiver membership
- //
- // = DESCRIPTION
- // Define the interface for all reliable multicast membership
public:
- // = Initialization and termination methods.
+ //! Constructor
ACE_RMCast_Membership (void);
- // Constructor
+ //! Destructor
virtual ~ACE_RMCast_Membership (void);
- // Destructor
- // = The RMCast_Module methods
+ //! Receive an process an Ack message
+ /*!
+ * After receiving the Ack message we find out what is the lowest
+ * sequence number received in order among all the acks received by
+ * the proxies in the collection. We also find out what is the
+ * highest sequence number received by any proxy.
+ * We only propagate that information back to the upper layer, and
+ * then only if there are any news since the last Ack.
+ */
virtual int ack (ACE_RMCast::Ack &);
+
+ //! Add a new member to the collection, using the <source> field in
+ //! the Join message
virtual int join (ACE_RMCast::Join &);
+
+ //! Remove a member from the collection, using the <source> field in
+ //! the Join message
virtual int leave (ACE_RMCast::Leave &);
protected:
+ //! Use an unbounded set to maintain the collection of proxies.
typedef ACE_Unbounded_Set<ACE_RMCast_Proxy*> Proxy_Collection;
typedef ACE_Unbounded_Set_Iterator<ACE_RMCast_Proxy*> Proxy_Iterator;
+ //! The collection of proxies
Proxy_Collection proxies_;
- // The membership buffer
- ACE_UINT32 highest_in_sequence_;
- // The smallest value of <highest_in_sequence> for all the proxies
+ //! The smallest value of \param next_expected for all the proxies
+ ACE_UINT32 next_expected_;
+ //! The highest value of \param highest_received for all the proxies
ACE_UINT32 highest_received_;
- // The highest value of <highest_received> for all the proxies
+ //! Synchronization
ACE_SYNCH_MUTEX mutex_;
- // Synchronization
};
#if defined (__ACE_INLINE__)
diff --git a/protocols/ace/RMCast/RMCast_Membership.i b/protocols/ace/RMCast/RMCast_Membership.i
index 0c3e33c2d01..b513c2d5141 100644
--- a/protocols/ace/RMCast/RMCast_Membership.i
+++ b/protocols/ace/RMCast/RMCast_Membership.i
@@ -2,7 +2,7 @@
ACE_INLINE
ACE_RMCast_Membership::ACE_RMCast_Membership (void)
- : highest_in_sequence_ (0)
+ : next_expected_ (0)
, highest_received_ (0)
{
}
diff --git a/protocols/ace/RMCast/RMCast_Module.h b/protocols/ace/RMCast/RMCast_Module.h
index dc4077fa4ab..fad76caac53 100644
--- a/protocols/ace/RMCast/RMCast_Module.h
+++ b/protocols/ace/RMCast/RMCast_Module.h
@@ -36,55 +36,55 @@ class ACE_Time_Value;
class ACE_RMCast_Export ACE_RMCast_Module
{
public:
- // = Initialization and termination methods.
+ //! Constructor
ACE_RMCast_Module (void);
- //!< Constructor
+ //! Destructor
virtual ~ACE_RMCast_Module (void);
- //!< Destructor
+ //! Modifier for the next element in the stack
virtual int next (ACE_RMCast_Module *next);
- //!< Modifier for the next element in the stack
+ //! Accesor for the next element in the stack
virtual ACE_RMCast_Module* next (void) const;
- //!< Accesor for the next element in the stack
+ //! Modifier for the previous element in the stack
virtual int prev (ACE_RMCast_Module *prev);
- //!< Modifier for the previous element in the stack
+ //! Accesor for the previous element in the stack
virtual ACE_RMCast_Module* prev (void) const;
- //!< Accesor for the previous element in the stack
+ //! Initialize the module, setting up the next module
virtual int open (void);
- //!< Initialize the module, setting up the next module
+ //! Close the module.
virtual int close (void);
- //!< Close the module.
+ //! Push data through the stack
virtual int data (ACE_RMCast::Data &);
- //!< Push data through the stack
+ //! Push a polling request through the stack
virtual int poll (ACE_RMCast::Poll &);
- //!< Push a polling request through the stack
+ //! Push a message to ack a join request through the stack
virtual int ack_join (ACE_RMCast::Ack_Join &);
- //!< Push a message to ack a join request through the stack
+ //! Push a message to ack a leave request through the stack
virtual int ack_leave (ACE_RMCast::Ack_Leave &);
- //!< Push a message to ack a leave request through the stack
+ //! Push an ack mesage through the stack
virtual int ack (ACE_RMCast::Ack &);
- //!< Push an ack mesage through the stack
+ //! Push a join message through the stack
virtual int join (ACE_RMCast::Join &);
- //!< Push a join message through the stack
+ //! Push a leave message through the stack
virtual int leave (ACE_RMCast::Leave &);
- //!< Push a leave message through the stack
private:
//! The next element in the stack
ACE_RMCast_Module *next_;
+
//! The previous element in the stack
ACE_RMCast_Module *prev_;
};
diff --git a/protocols/ace/RMCast/RMCast_Module_Factory.h b/protocols/ace/RMCast/RMCast_Module_Factory.h
index 722ad87d678..f0ea58df0e5 100644
--- a/protocols/ace/RMCast/RMCast_Module_Factory.h
+++ b/protocols/ace/RMCast/RMCast_Module_Factory.h
@@ -27,19 +27,40 @@
class ACE_RMCast_Module;
class ACE_RMCast_IO_UDP;
+//! Create Module stacks
+/*!
+ * Different application will probably require different
+ * configurations in their Module stack, some will just want best
+ * effort semantics. Others will use Reliable communication with a
+ * maximum retransmission time. Furthermore, applications may want to
+ * receive messages in send order, or just as soon as they are
+ * received.
+ * Obviously most applications will want to change want happens once a
+ * message is completely received.
+ *
+ * To achieve all this flexibility the IO layer uses this factory to
+ * create the full stack of Modules corresponding to a single
+ * consumer.
+ * To keep the complexity under control the intention is to create
+ * helper Factories, such as Reliable_Module_Factory where
+ * applications only need to customize a few features.
+ */
class ACE_RMCast_Export ACE_RMCast_Module_Factory
{
- // = DESCRIPTION
- //
public:
+ //! Destructor
virtual ~ACE_RMCast_Module_Factory (void);
- // Destructor
+ //! Create a new proxy
virtual ACE_RMCast_Module *create (ACE_RMCast_IO_UDP *) = 0;
- // Create a new proxy
+ //! Destroy a proxy
+ /*!
+ * Some factories may allocate modules from a pool, or return the
+ * same module for all proxies. Consequently, only the factory
+ * knows how to destroy them.
+ */
virtual void destroy (ACE_RMCast_Module *) = 0;
- // Destroy a proxy
};
#if defined (__ACE_INLINE__)
diff --git a/protocols/ace/RMCast/RMCast_Partial_Message.h b/protocols/ace/RMCast/RMCast_Partial_Message.h
index 9b71eb4a541..88fa9ab2f1a 100644
--- a/protocols/ace/RMCast/RMCast_Partial_Message.h
+++ b/protocols/ace/RMCast/RMCast_Partial_Message.h
@@ -26,44 +26,72 @@
#define ACE_RMCAST_DEFAULT_HOLE_COUNT 16
#endif /* ACE_RMCAST_DEFAULT_HOLE_COUNT */
+//! Represent a partially received message in the
+//! ACE_RMCast_Reassembly module
+/*!
+ * This class provides temporary storage for the fragments as they are
+ * received in the ACE_RMCast_Reassembly module. It also keeps track
+ * of what portions of the message are still missing.
+ */
class ACE_RMCast_Export ACE_RMCast_Partial_Message
{
public:
+ //! Constructor, reserve enough memory for the complete message
ACE_RMCast_Partial_Message (ACE_UINT32 message_size);
+
+ //! Destructor
~ACE_RMCast_Partial_Message (void);
+ //! Process a fragment
+ /*!
+ * A fragment starting at <offset> has been received, copy the
+ * fragment contents and update the list of holes.
+ */
int fragment_received (ACE_UINT32 message_size,
ACE_UINT32 offset,
ACE_Message_Block *mb);
+
+ //! Return 1 if the message is complete
int is_complete (void) const;
+ //! Return the body of the message, the memory is *not* owned by the
+ //! caller
ACE_Message_Block *message_body (void);
- // Return the body of the message, the memory is owned by the
- // class.
private:
+ //! Insert a new hole into the list
+ /*!
+ * The class keeps an array to represent the missing portions of the
+ * message. This method inserts a new hole, i.e. a new element in
+ * the array at index <i>. The <start> and <end> arguments represent
+ * the offsets of the missing portion of the message.
+ */
int insert_hole (size_t i,
ACE_UINT32 start,
ACE_UINT32 end);
- // Insert a new hole into the list
+ //! Remove a hole from the list
int remove_hole (size_t i);
- // Remove a hole from the list
private:
+ //! Maintain the message storage
ACE_Message_Block message_body_;
- // Used to rebuild the body of the message
+ //! Represent a missing portion of a message
struct Hole
{
+ //! Offset where the missing portion of the message starts
ACE_UINT32 start;
+ //! Offset where the missing portion of the message ends
ACE_UINT32 end;
};
+ //! Implement a growing array of Hole structures
+ //@{
Hole *hole_list_;
size_t max_hole_count_;
size_t hole_count_;
- // The current list of holes in the message_body.
+ //@}
};
#if defined (__ACE_INLINE__)
diff --git a/protocols/ace/RMCast/RMCast_Proxy.cpp b/protocols/ace/RMCast/RMCast_Proxy.cpp
index 53d9d0b6726..f6b2bbec5e5 100644
--- a/protocols/ace/RMCast/RMCast_Proxy.cpp
+++ b/protocols/ace/RMCast/RMCast_Proxy.cpp
@@ -15,9 +15,9 @@ ACE_RMCast_Proxy::~ACE_RMCast_Proxy (void)
}
ACE_UINT32
-ACE_RMCast_Proxy::highest_in_sequence (void) const
+ACE_RMCast_Proxy::next_expected (void) const
{
- return this->highest_in_sequence_;
+ return this->next_expected_;
}
ACE_UINT32
@@ -29,7 +29,7 @@ ACE_RMCast_Proxy::highest_received (void) const
int
ACE_RMCast_Proxy::ack (ACE_RMCast::Ack &ack)
{
- this->highest_in_sequence_ = ack.highest_in_sequence;
+ this->next_expected_ = ack.next_expected;
this->highest_received_ = ack.highest_received;
return this->ACE_RMCast_Module::ack (ack);
}
diff --git a/protocols/ace/RMCast/RMCast_Proxy.h b/protocols/ace/RMCast/RMCast_Proxy.h
index 414b74174fb..e0e6afe79b1 100644
--- a/protocols/ace/RMCast/RMCast_Proxy.h
+++ b/protocols/ace/RMCast/RMCast_Proxy.h
@@ -48,27 +48,28 @@ public:
//! Destructor
virtual ~ACE_RMCast_Proxy (void);
-
- //! Return the highest sequence number received without any losses
- //! before it. Only applies to remote receiver proxies.
+
+ //! Return the next sequence number expected by the peer. Only
+ //! applies to remote receiver proxies.
/*!
- Please read the documentation in ACE_RMCast::Ack
+ * Please read the documentation in ACE_RMCast::Ack
*/
- virtual ACE_UINT32 highest_in_sequence (void) const;
+ virtual ACE_UINT32 next_expected (void) const;
//! Return the highest sequence number successfully received.
//! Only applies to remote receiver proxies.
/*!
- Please read the documentation in ACE_RMCast::Ack
+ * Please read the documentation in ACE_RMCast::Ack
*/
virtual ACE_UINT32 highest_received (void) const;
//@{
//! Send messages directly to the peer.
- /*! Send a message directly to the peer, i.e. the message is not
- sent through the multicast group and it may not be processed by
- all the layers in the stack.
- */
+ /*!
+ * Send a message directly to the peer, i.e. the message is not
+ * sent through the multicast group and it may not be processed by
+ * all the layers in the stack.
+ */
virtual int reply_data (ACE_RMCast::Data &) = 0;
virtual int reply_poll (ACE_RMCast::Poll &) = 0;
virtual int reply_ack_join (ACE_RMCast::Ack_Join &) = 0;
@@ -79,8 +80,8 @@ public:
//@}
/*!
- Proxies process the ACK sequence numbers to save the sequence
- numbers reported from the remote peer.
+ * Proxies process the ACK sequence numbers to cache the ack
+ * information from the peer.
*/
virtual int ack (ACE_RMCast::Ack &);
@@ -88,7 +89,7 @@ private:
//@{
//! Cache the sequence numbers reported from the remote peer using
//! Ack messages
- ACE_UINT32 highest_in_sequence_;
+ ACE_UINT32 next_expected_;
ACE_UINT32 highest_received_;
//@}
};
diff --git a/protocols/ace/RMCast/RMCast_Proxy.i b/protocols/ace/RMCast/RMCast_Proxy.i
index f93feaa5639..6fee09fe9e5 100644
--- a/protocols/ace/RMCast/RMCast_Proxy.i
+++ b/protocols/ace/RMCast/RMCast_Proxy.i
@@ -2,7 +2,7 @@
ACE_INLINE
ACE_RMCast_Proxy::ACE_RMCast_Proxy (void)
- : highest_in_sequence_ (0)
+ : next_expected_ (0)
, highest_received_ (0)
{
}
diff --git a/protocols/ace/RMCast/RMCast_Retransmission.cpp b/protocols/ace/RMCast/RMCast_Retransmission.cpp
index a996e1204d5..7e38cdf7c97 100644
--- a/protocols/ace/RMCast/RMCast_Retransmission.cpp
+++ b/protocols/ace/RMCast/RMCast_Retransmission.cpp
@@ -16,18 +16,60 @@ ACE_RMCast_Retransmission::~ACE_RMCast_Retransmission (void)
{
}
+class ACE_RMCast_Resend_Worker
+ : public ACE_RMCast_Worker<ACE_UINT32,ACE_RMCast::Data>
+{
+public:
+ ACE_RMCast_Resend_Worker (ACE_RMCast_Module *next,
+ ACE_UINT32 max_sequence_number)
+ : n (0)
+ , next_ (next)
+ , max_sequence_number_ (max_sequence_number)
+ {
+ }
+
+ int work (ACE_UINT32 const & key,
+ ACE_RMCast::Data const & item)
+ {
+ if (key > this->max_sequence_number_)
+ return 0;
+ ACE_DEBUG ((LM_DEBUG,
+ " Retransmission::resend - message %d resent\n",
+ key));
+ ACE_RMCast::Data data = item;
+ int r = this->next_->data (data);
+ if (r != 0)
+ return r;
+ n++;
+ return 0;
+ }
+
+ int n;
+
+private:
+ ACE_RMCast_Module *next_;
+
+ ACE_UINT32 max_sequence_number_;
+};
+
int
-ACE_RMCast_Retransmission::close (void)
+ACE_RMCast_Retransmission::resend (ACE_UINT32 max_sequence_number)
{
- Messages_Iterator end = this->messages_.end ();
+ if (this->next () == 0)
+ return 0;
- for (Messages_Iterator i = this->messages_.begin ();
- i != end;
- ++i)
- {
- ACE_Message_Block::release ((*i).item ().payload);
- }
- this->messages_.close ();
+ ACE_RMCast_Resend_Worker worker (this->next (), max_sequence_number);
+
+ if (this->messages_.for_each (&worker) == -1)
+ return -1;
+
+ return worker.n;
+}
+
+int
+ACE_RMCast_Retransmission::close (void)
+{
+ // @@
return 0;
}
@@ -40,7 +82,6 @@ ACE_RMCast_Retransmission::data (ACE_RMCast::Data &data)
int r = this->next ()->data (data);
if (r == 0)
{
- ACE_GUARD_RETURN (ACE_SYNCH_MUTEX, ace_mon, this->mutex_, -1);
ACE_RMCast::Data copy = data;
copy.payload = ACE_Message_Block::duplicate (data.payload);
r = this->messages_.bind (data.sequence_number, copy);
@@ -55,8 +96,9 @@ ACE_RMCast_Retransmission::join (ACE_RMCast::Join &join)
return 0;
ACE_RMCast::Ack_Join ack_join;
+#if 0
+ // @@
{
- ACE_GUARD_RETURN (ACE_SYNCH_MUTEX, ace_mon, this->mutex_, -1);
Messages_Iterator end = this->messages_.end ();
Messages_Iterator begin = this->messages_.begin ();
@@ -70,6 +112,7 @@ ACE_RMCast_Retransmission::join (ACE_RMCast::Join &join)
ack_join.next_sequence_number = (*begin).key ();
}
}
+#endif
(void) join.source->reply_ack_join (ack_join);
// @@ We should force a full retransmission of all the messages!
@@ -77,20 +120,54 @@ ACE_RMCast_Retransmission::join (ACE_RMCast::Join &join)
return 0;
}
+class ACE_RMCast_Ack_Worker
+ : public ACE_RMCast_Worker<ACE_UINT32,ACE_RMCast::Data>
+{
+public:
+ ACE_RMCast_Ack_Worker (ACE_RMCast::Ack &ack,
+ ACE_RMCast_Retransmission::Messages::Write_Guard &g,
+ ACE_RMCast_Retransmission::Messages *messages)
+ : ack_ (ack)
+ , ace_mon_ (g)
+ , messages_ (messages)
+ {
+ }
+
+ int work (ACE_UINT32 const & key,
+ ACE_RMCast::Data const &)
+ {
+ if (key >= this->ack_.next_expected)
+ return 0;
+ ACE_DEBUG ((LM_DEBUG,
+ " Retransmission::ack - message %d erased\n",
+ key));
+ return this->messages_->unbind_i (this->ace_mon_, key);
+ }
+
+private:
+ ACE_RMCast_Ack_Worker (const ACE_RMCast_Ack_Worker&);
+ ACE_RMCast_Ack_Worker& operator= (const ACE_RMCast_Ack_Worker&);
+
+private:
+ ACE_RMCast::Ack &ack_;
+
+ ACE_RMCast_Retransmission::Messages::Write_Guard &ace_mon_;
+
+ ACE_RMCast_Retransmission::Messages *messages_;
+};
+
int
ACE_RMCast_Retransmission::ack (ACE_RMCast::Ack &ack)
{
- ACE_GUARD_RETURN (ACE_SYNCH_MUTEX, ace_mon, this->mutex_, -1);
- for (Messages_Iterator i = this->messages_.begin ();
- i != this->messages_.end ();
- /* do nothing */)
- {
- ACE_UINT32 key = (*i).key ();
- if (key > ack.highest_in_sequence)
- break;
- this->messages_.unbind (key);
- }
- return 0;
+ Messages::Write_Guard ace_mon (this->messages_.mutex_,
+ this->messages_.cond_,
+ this->messages_.pending_writes_,
+ this->messages_.writing_,
+ this->messages_.collection_);
+
+ ACE_RMCast_Ack_Worker worker (ack, ace_mon, &this->messages_);
+
+ return this->messages_.for_each (&worker);
}
#if defined (ACE_HAS_EXPLICIT_TEMPLATE_INSTANTIATION)
@@ -101,4 +178,10 @@ template class ACE_RB_Tree_Iterator<ACE_UINT32,ACE_RMCast::Data,ACE_Less_Than<AC
template class ACE_RB_Tree_Reverse_Iterator<ACE_UINT32,ACE_RMCast::Data,ACE_Less_Than<ACE_UINT32>,ACE_Null_Mutex>;
template class ACE_RB_Tree_Node<ACE_UINT32,ACE_RMCast::Data>;
+template class ACE_RMCast_Copy_On_Write<ACE_UINT32,ACE_RMCast::Data,ACE_RMCast_Retransmission::Collection,ACE_RMCast_Retransmission::Collection_Iterator>;
+template class ACE_RMCast_Copy_On_Write_Write_Guard<ACE_RMCast_Retransmission::Collection,ACE_RMCast_Retransmission::Collection_Iterator>;
+template class ACE_RMCast_Copy_On_Write_Read_Guard<ACE_RMCast_Retransmission::Collection,ACE_RMCast_Retransmission::Collection_Iterator>;
+template class ACE_RMCast_Copy_On_Write_Collection<ACE_RMCast_Retransmission::Collection,ACE_RMCast_Retransmission::Collection_Iterator>;
+template class ACE_RMCast_Worker<ACE_UINT32,ACE_RMCast::Data>;
+
#endif /* ACE_HAS_EXPLICIT_TEMPLATE_INSTANTIATION */
diff --git a/protocols/ace/RMCast/RMCast_Retransmission.h b/protocols/ace/RMCast/RMCast_Retransmission.h
index 7c586fe5dd6..b7bc20d2914 100644
--- a/protocols/ace/RMCast/RMCast_Retransmission.h
+++ b/protocols/ace/RMCast/RMCast_Retransmission.h
@@ -19,6 +19,7 @@
#include "ace/pre.h"
#include "RMCast_Module.h"
+#include "RMCast_Copy_On_Write.h"
#include "ace/RB_Tree.h"
#include "ace/Synch.h"
@@ -26,38 +27,72 @@
# pragma once
#endif /* ACE_LACKS_PRAGMA_ONCE */
+//! Store messages for retransmission in reliable configurations
+/*!
+ * Reliable configurations of the RMCast framework need to store
+ * messages on the sender side to resend them if one or more clients
+ * do not receive them successfully.
+ */
class ACE_RMCast_Export ACE_RMCast_Retransmission : public ACE_RMCast_Module
{
- // = TITLE
- // Reliable Multicast Retransmission
- //
- // = DESCRIPTION
- // Define the interface for all reliable multicast retransmission
public:
// = Initialization and termination methods.
+ //! Constructor
ACE_RMCast_Retransmission (void);
- // Constructor
+ //! Destructor
virtual ~ACE_RMCast_Retransmission (void);
- // Destructor
- // = The RMCast_Module methods
+ //! Use a Red-Black Tree to keep the queue of messages
+ typedef ACE_RB_Tree<ACE_UINT32,ACE_RMCast::Data,ACE_Less_Than<ACE_UINT32>,ACE_Null_Mutex> Collection;
+ typedef ACE_RB_Tree_Iterator<ACE_UINT32,ACE_RMCast::Data,ACE_Less_Than<ACE_UINT32>,ACE_Null_Mutex> Collection_Iterator;
+
+ //! The messages are stored in the Copy_On_Write wrapper to provide
+ //! an efficient, but thread safe interface.
+ typedef ACE_RMCast_Copy_On_Write<ACE_UINT32,ACE_RMCast::Data,Collection,Collection_Iterator> Messages;
+
+ //! Resend messages
+ /*!
+ * Resends all the messages up to \param max_sequence_number
+ * Returns the number of messages sent, or -1 if there where any
+ * errors.
+ */
+ int resend (ACE_UINT32 max_sequence_number);
+
+ //! Cleanup all the stored messages
virtual int close (void);
+
+ //! Pass the message downstream, but also save it in the
+ //! retransmission queue
+ /*!
+ * Sequence number are assigned by the ACE_RMCast_Fragmentation
+ * class, consequently this class first passes the message
+ * downstream, to obtain the sequence number and then stores the
+ * message for later retransmission.
+ */
virtual int data (ACE_RMCast::Data &data);
+
+ //! Process an Ack message from the remote receivers.
+ /*!
+ * Normally this Ack message will be a summary of all the Ack
+ * messages received by the ACE_RMCast_Membership class
+ */
virtual int ack (ACE_RMCast::Ack &);
+
+ //! Detect when new members join the group and Ack_Join them
+ /*!
+ * When a new receiver joins the group this module sends an Ack_Join
+ * message with the next sequence number that the receiver should
+ * expect.
+ * The sequence number is obtained from the current list of cached
+ * messages.
+ */
virtual int join (ACE_RMCast::Join &);
protected:
- typedef ACE_RB_Tree<ACE_UINT32,ACE_RMCast::Data,ACE_Less_Than<ACE_UINT32>,ACE_Null_Mutex>
- Messages;
- typedef ACE_RB_Tree_Iterator<ACE_UINT32,ACE_RMCast::Data,ACE_Less_Than<ACE_UINT32>,ACE_Null_Mutex>
- Messages_Iterator;
+ //! The retransmission buffer
Messages messages_;
- // The retransmission buffer
-
- ACE_SYNCH_MUTEX mutex_;
- // Synchronization
};
#if defined (__ACE_INLINE__)
diff --git a/protocols/ace/RMCast/RMCast_UDP_Event_Handler.h b/protocols/ace/RMCast/RMCast_UDP_Event_Handler.h
index 02798cee7f8..2a6e7c45d42 100644
--- a/protocols/ace/RMCast/RMCast_UDP_Event_Handler.h
+++ b/protocols/ace/RMCast/RMCast_UDP_Event_Handler.h
@@ -1,16 +1,5 @@
// $Id$
-// ============================================================================
-//
-// = DESCRIPTION
-// Implement an adapter between the ACE Reactor and the
-// ACE_RMCast_IO_UDP
-//
-// = AUTHOR
-// Carlos O'Ryan <coryan@uci.edu>
-//
-// ============================================================================
-
#ifndef ACE_RMCAST_UDP_EVENT_HANDLER_H
#define ACE_RMCAST_UDP_EVENT_HANDLER_H
#include "ace/pre.h"
@@ -25,24 +14,41 @@
class ACE_RMCast_IO_UDP;
class ACE_INET_Addr;
+//! Implement an Adapter for the ACE_RMCast_IO_UDP class
+/*!
+ * Applications may wish to use the ACE_Reactor to demultiplex I/O
+ * events for an ACE_RMCast_IO_UDP object. However other application
+ * may choose to make ACE_RMCast_IO_UDP active, or they may dedicate
+ * their own threads for its events.
+ * To avoid couplin ACE_RMCast_IO_UDP with the Reactor we don't make
+ * it derived from ACE_Event_Handler or any other class in the Reactor
+ * framework, instead, this simple Adapter can forward the Reactor
+ * messages to an ACE_RMCast_IO_UDP object.
+ */
class ACE_RMCast_Export ACE_RMCast_UDP_Event_Handler : public ACE_Event_Handler
{
public:
- ACE_RMCast_UDP_Event_Handler (ACE_RMCast_IO_UDP *receiver);
- // Constructor
-
+ //! Constructor, save io_udp as the Adaptee in the Adapter pattern.
+ ACE_RMCast_UDP_Event_Handler (ACE_RMCast_IO_UDP *io_udp);
+
+ //! Destructor
+ /*!
+ * Notice that this class does not own the ACE_RMCast_IO_UDP
+ * adaptee, so it does not destroy it.
+ */
~ACE_RMCast_UDP_Event_Handler (void);
- // Destructor
- // = The Event_Handler methods
+ //@{
+ //! Documented in ACE_Event_Handler class
virtual ACE_HANDLE get_handle (void) const;
virtual int handle_input (ACE_HANDLE fd = ACE_INVALID_HANDLE);
virtual int handle_timeout (const ACE_Time_Value &current_time,
const void *act = 0);
+ //@}
private:
+ //! The adaptee
ACE_RMCast_IO_UDP *io_udp_;
- // The sender
};
#if defined (__ACE_INLINE__)
diff --git a/protocols/ace/RMCast/RMCast_UDP_Proxy.cpp b/protocols/ace/RMCast/RMCast_UDP_Proxy.cpp
index 010267b0cbb..2eb0983b171 100644
--- a/protocols/ace/RMCast/RMCast_UDP_Proxy.cpp
+++ b/protocols/ace/RMCast/RMCast_UDP_Proxy.cpp
@@ -123,7 +123,7 @@ ACE_RMCast_UDP_Proxy::receive_message (char *buffer, size_t size)
ACE_OS::memcpy (&tmp, buffer + 1,
sizeof(tmp));
- ack.highest_in_sequence = ACE_NTOHL (tmp);
+ ack.next_expected = ACE_NTOHL (tmp);
ACE_OS::memcpy (&tmp, buffer + 1 + sizeof(ACE_UINT32),
sizeof(tmp));
ack.highest_received = ACE_NTOHL (tmp);
@@ -175,4 +175,3 @@ ACE_RMCast_UDP_Proxy::reply_leave (ACE_RMCast::Leave &leave)
{
return this->io_udp_->send_leave (leave, this->peer_addr_);
}
-
diff --git a/protocols/ace/RMCast/RMCast_Worker.cpp b/protocols/ace/RMCast/RMCast_Worker.cpp
new file mode 100644
index 00000000000..06254b8c0f6
--- /dev/null
+++ b/protocols/ace/RMCast/RMCast_Worker.cpp
@@ -0,0 +1,19 @@
+// $Id$
+
+#ifndef ACE_RMCAST_WORKER_CPP
+#define ACE_RMCAST_WORKER_CPP
+
+#include "RMCast_Worker.h"
+
+#if ! defined (__ACE_INLINE__)
+#include "RMCast_Worker.i"
+#endif /* __ACE_INLINE__ */
+
+ACE_RCSID(RMCast, RMCast_Worker, "$Id$")
+
+template<class KEY, class ITEM>
+ACE_RMCast_Worker<KEY,ITEM>::~ACE_RMCast_Worker (void)
+{
+}
+
+#endif /* ACE_RMCAST_WORKER_CPP */
diff --git a/protocols/ace/RMCast/RMCast_Worker.h b/protocols/ace/RMCast/RMCast_Worker.h
new file mode 100644
index 00000000000..d3eb3032ebc
--- /dev/null
+++ b/protocols/ace/RMCast/RMCast_Worker.h
@@ -0,0 +1,36 @@
+/* -*- C++ -*- */
+// $Id$
+//
+
+#ifndef ACE_RMCAST_WORKER_H
+#define ACE_RMCAST_WORKER_H
+
+#include "ace/config-all.h"
+
+#if !defined (ACE_LACKS_PRAGMA_ONCE)
+# pragma once
+#endif /* ACE_LACKS_PRAGMA_ONCE */
+
+template<class KEY, class ITEM>
+class ACE_RMCast_Worker
+{
+public:
+ virtual ~ACE_RMCast_Worker (void);
+
+ virtual int work (KEY const & key,
+ ITEM const & item) = 0;
+};
+
+#if defined (__ACE_INLINE__)
+#include "RMCast_Worker.i"
+#endif /* __ACE_INLINE__ */
+
+#if defined (ACE_TEMPLATES_REQUIRE_SOURCE)
+#include "RMCast_Worker.cpp"
+#endif /* ACE_TEMPLATES_REQUIRE_SOURCE */
+
+#if defined (ACE_TEMPLATES_REQUIRE_PRAGMA)
+#pragma implementation ("RMCast_Worker.cpp")
+#endif /* ACE_TEMPLATES_REQUIRE_PRAGMA */
+
+#endif /* ACE_RMCAST_WORKER_H */
diff --git a/protocols/ace/RMCast/RMCast_Worker.i b/protocols/ace/RMCast/RMCast_Worker.i
new file mode 100644
index 00000000000..cfa1da318d3
--- /dev/null
+++ b/protocols/ace/RMCast/RMCast_Worker.i
@@ -0,0 +1 @@
+// $Id$
diff --git a/tests/Makefile b/tests/Makefile
index 3a630c89bdf..0f1b0fc51d2 100644
--- a/tests/Makefile
+++ b/tests/Makefile
@@ -137,8 +137,12 @@ ifneq ($(ACE_HAS_GNUG_PRE_2_8),1)
endif
endif
+ifeq ($(rmcast),1)
+ DIRS += RMCast
+endif
+
include $(ACE_ROOT)/include/makeinclude/rules.common.GNU
-include $(ACE_ROOT)/include/makeinclude/rules.nonested.GNU
+include $(ACE_ROOT)/include/makeinclude/rules.nested.GNU
include $(ACE_ROOT)/include/makeinclude/rules.bin.GNU
ifdef purify
diff --git a/tests/RMCast/Makefile b/tests/RMCast/Makefile
index c1949350a25..0741c7df36f 100644
--- a/tests/RMCast/Makefile
+++ b/tests/RMCast/Makefile
@@ -11,7 +11,8 @@
BIN = RMCast_Fragment_Test \
RMCast_Reassembly_Test \
RMCast_UDP_Best_Effort_Test \
- RMCast_Membership_Test
+ RMCast_Membership_Test \
+ RMCast_Retransmission_Test
PSRC=$(addsuffix .cpp,$(BIN))
LDLIBS = -lACE_RMCast
@@ -674,4 +675,167 @@ endif
$(ACE_ROOT)/ace/Stream_Modules.h \
$(ACE_ROOT)/ace/Stream_Modules.cpp
+.obj/RMCast_Retransmission_Test.o .obj/RMCast_Retransmission_Test.so .shobj/RMCast_Retransmission_Test.o .shobj/RMCast_Retransmission_Test.so: RMCast_Retransmission_Test.cpp \
+ ../test_config.h \
+ $(ACE_ROOT)/ace/pre.h \
+ $(ACE_ROOT)/ace/post.h \
+ $(ACE_ROOT)/ace/ACE_export.h \
+ $(ACE_ROOT)/ace/svc_export.h \
+ $(ACE_ROOT)/ace/ace_wchar.h \
+ $(ACE_ROOT)/ace/OS.h \
+ $(ACE_ROOT)/ace/OS_Dirent.h \
+ $(ACE_ROOT)/ace/OS_Export.h \
+ $(ACE_ROOT)/ace/OS_Dirent.inl \
+ $(ACE_ROOT)/ace/OS_String.h \
+ $(ACE_ROOT)/ace/OS_String.inl \
+ $(ACE_ROOT)/ace/OS_Memory.h \
+ $(ACE_ROOT)/ace/OS_Memory.inl \
+ $(ACE_ROOT)/ace/OS_TLI.h \
+ $(ACE_ROOT)/ace/OS_TLI.inl \
+ $(ACE_ROOT)/ace/Min_Max.h \
+ $(ACE_ROOT)/ace/streams.h \
+ $(ACE_ROOT)/ace/Basic_Types.h \
+ $(ACE_ROOT)/ace/Basic_Types.i \
+ $(ACE_ROOT)/ace/Trace.h \
+ $(ACE_ROOT)/ace/OS.i \
+ $(ACE_ROOT)/ace/Singleton.h \
+ $(ACE_ROOT)/ace/Synch.h \
+ $(ACE_ROOT)/ace/ACE.h \
+ $(ACE_ROOT)/ace/ACE.i \
+ $(ACE_ROOT)/ace/Synch.i \
+ $(ACE_ROOT)/ace/Synch_T.h \
+ $(ACE_ROOT)/ace/Event_Handler.h \
+ $(ACE_ROOT)/ace/Event_Handler.i \
+ $(ACE_ROOT)/ace/Synch_T.i \
+ $(ACE_ROOT)/ace/Thread.h \
+ $(ACE_ROOT)/ace/Thread_Adapter.h \
+ $(ACE_ROOT)/ace/Base_Thread_Adapter.h \
+ $(ACE_ROOT)/ace/Base_Thread_Adapter.inl \
+ $(ACE_ROOT)/ace/Thread_Adapter.inl \
+ $(ACE_ROOT)/ace/Thread.i \
+ $(ACE_ROOT)/ace/Atomic_Op.i \
+ $(ACE_ROOT)/ace/Synch_T.cpp \
+ $(ACE_ROOT)/ace/Log_Msg.h \
+ $(ACE_ROOT)/ace/Log_Record.h \
+ $(ACE_ROOT)/ace/Log_Priority.h \
+ $(ACE_ROOT)/ace/Log_Record.i \
+ $(ACE_ROOT)/ace/Singleton.i \
+ $(ACE_ROOT)/ace/Singleton.cpp \
+ $(ACE_ROOT)/ace/Object_Manager.h \
+ $(ACE_ROOT)/ace/Object_Manager.i \
+ $(ACE_ROOT)/ace/Managed_Object.h \
+ $(ACE_ROOT)/ace/Managed_Object.i \
+ $(ACE_ROOT)/ace/Managed_Object.cpp \
+ $(ACE_ROOT)/ace/RMCast/RMCast_Proxy.h \
+ $(ACE_ROOT)/ace/RMCast/RMCast_Module.h \
+ $(ACE_ROOT)/ace/RMCast/RMCast.h \
+ $(ACE_ROOT)/ace/RMCast/RMCast_Export.h \
+ $(ACE_ROOT)/ace/RMCast/RMCast.i \
+ $(ACE_ROOT)/ace/RMCast/RMCast_Module.i \
+ $(ACE_ROOT)/ace/RMCast/RMCast_Proxy.i \
+ $(ACE_ROOT)/ace/RMCast/RMCast_Retransmission.h \
+ $(ACE_ROOT)/ace/RMCast/RMCast_Copy_On_Write.h \
+ $(ACE_ROOT)/ace/RMCast/RMCast_Worker.h \
+ $(ACE_ROOT)/ace/RMCast/RMCast_Worker.i \
+ $(ACE_ROOT)/ace/RMCast/RMCast_Worker.cpp \
+ $(ACE_ROOT)/ace/RMCast/RMCast_Copy_On_Write.i \
+ $(ACE_ROOT)/ace/RMCast/RMCast_Copy_On_Write.cpp \
+ $(ACE_ROOT)/ace/RB_Tree.h \
+ $(ACE_ROOT)/ace/Functor.h \
+ $(ACE_ROOT)/ace/Functor.i \
+ $(ACE_ROOT)/ace/Functor_T.h \
+ $(ACE_ROOT)/ace/Functor_T.i \
+ $(ACE_ROOT)/ace/Functor_T.cpp \
+ $(ACE_ROOT)/ace/RB_Tree.i \
+ $(ACE_ROOT)/ace/Malloc.h \
+ $(ACE_ROOT)/ace/Malloc_Base.h \
+ $(ACE_ROOT)/ace/Based_Pointer_T.h \
+ $(ACE_ROOT)/ace/Based_Pointer_T.i \
+ $(ACE_ROOT)/ace/Based_Pointer_T.cpp \
+ $(ACE_ROOT)/ace/Based_Pointer_Repository.h \
+ $(ACE_ROOT)/ace/Malloc.i \
+ $(ACE_ROOT)/ace/Malloc_T.h \
+ $(ACE_ROOT)/ace/Free_List.h \
+ $(ACE_ROOT)/ace/Free_List.i \
+ $(ACE_ROOT)/ace/Free_List.cpp \
+ $(ACE_ROOT)/ace/Malloc_T.i \
+ $(ACE_ROOT)/ace/Malloc_T.cpp \
+ $(ACE_ROOT)/ace/Memory_Pool.h \
+ $(ACE_ROOT)/ace/Signal.h \
+ $(ACE_ROOT)/ace/Containers.h \
+ $(ACE_ROOT)/ace/Containers.i \
+ $(ACE_ROOT)/ace/Containers_T.h \
+ $(ACE_ROOT)/ace/Containers_T.i \
+ $(ACE_ROOT)/ace/Containers_T.cpp \
+ $(ACE_ROOT)/ace/Signal.i \
+ $(ACE_ROOT)/ace/Mem_Map.h \
+ $(ACE_ROOT)/ace/Mem_Map.i \
+ $(ACE_ROOT)/ace/SV_Semaphore_Complex.h \
+ $(ACE_ROOT)/ace/SV_Semaphore_Simple.h \
+ $(ACE_ROOT)/ace/SV_Semaphore_Simple.i \
+ $(ACE_ROOT)/ace/SV_Semaphore_Complex.i \
+ $(ACE_ROOT)/ace/Memory_Pool.i \
+ $(ACE_ROOT)/ace/RB_Tree.cpp \
+ $(ACE_ROOT)/ace/RMCast/RMCast_Retransmission.i \
+ $(ACE_ROOT)/ace/Task.h \
+ $(ACE_ROOT)/ace/Service_Object.h \
+ $(ACE_ROOT)/ace/Shared_Object.h \
+ $(ACE_ROOT)/ace/Shared_Object.i \
+ $(ACE_ROOT)/ace/Service_Object.i \
+ $(ACE_ROOT)/ace/Thread_Manager.h \
+ $(ACE_ROOT)/ace/Thread_Manager.i \
+ $(ACE_ROOT)/ace/Task.i \
+ $(ACE_ROOT)/ace/Task_T.h \
+ $(ACE_ROOT)/ace/Message_Queue.h \
+ $(ACE_ROOT)/ace/Message_Block.h \
+ $(ACE_ROOT)/ace/Message_Block.i \
+ $(ACE_ROOT)/ace/Message_Block_T.h \
+ $(ACE_ROOT)/ace/Message_Block_T.i \
+ $(ACE_ROOT)/ace/Message_Block_T.cpp \
+ $(ACE_ROOT)/ace/IO_Cntl_Msg.h \
+ $(ACE_ROOT)/ace/Message_Queue_T.h \
+ $(ACE_ROOT)/ace/Message_Queue_T.i \
+ $(ACE_ROOT)/ace/Message_Queue_T.cpp \
+ $(ACE_ROOT)/ace/Strategies.h \
+ $(ACE_ROOT)/ace/Strategies_T.h \
+ $(ACE_ROOT)/ace/Service_Config.h \
+ $(ACE_ROOT)/ace/SString.h \
+ $(ACE_ROOT)/ace/SString.i \
+ $(ACE_ROOT)/ace/Service_Config.i \
+ $(ACE_ROOT)/ace/Reactor.h \
+ $(ACE_ROOT)/ace/Handle_Set.h \
+ $(ACE_ROOT)/ace/Handle_Set.i \
+ $(ACE_ROOT)/ace/Timer_Queue.h \
+ $(ACE_ROOT)/ace/Timer_Queue_T.h \
+ $(ACE_ROOT)/ace/Timer_Queue_T.i \
+ $(ACE_ROOT)/ace/Timer_Queue_T.cpp \
+ $(ACE_ROOT)/ace/Reactor.i \
+ $(ACE_ROOT)/ace/Reactor_Impl.h \
+ $(ACE_ROOT)/ace/Svc_Conf_Tokens.h \
+ $(ACE_ROOT)/ace/Synch_Options.h \
+ $(ACE_ROOT)/ace/Synch_Options.i \
+ $(ACE_ROOT)/ace/Hash_Map_Manager.h \
+ $(ACE_ROOT)/ace/Hash_Map_Manager_T.h \
+ $(ACE_ROOT)/ace/Hash_Map_Manager_T.i \
+ $(ACE_ROOT)/ace/Hash_Map_Manager_T.cpp \
+ $(ACE_ROOT)/ace/Strategies_T.i \
+ $(ACE_ROOT)/ace/Strategies_T.cpp \
+ $(ACE_ROOT)/ace/Service_Repository.h \
+ $(ACE_ROOT)/ace/Service_Types.h \
+ $(ACE_ROOT)/ace/Service_Types.i \
+ $(ACE_ROOT)/ace/Service_Repository.i \
+ $(ACE_ROOT)/ace/WFMO_Reactor.h \
+ $(ACE_ROOT)/ace/Process_Mutex.h \
+ $(ACE_ROOT)/ace/Process_Mutex.inl \
+ $(ACE_ROOT)/ace/WFMO_Reactor.i \
+ $(ACE_ROOT)/ace/Strategies.i \
+ $(ACE_ROOT)/ace/Message_Queue.i \
+ $(ACE_ROOT)/ace/Task_T.i \
+ $(ACE_ROOT)/ace/Task_T.cpp \
+ $(ACE_ROOT)/ace/Module.h \
+ $(ACE_ROOT)/ace/Module.i \
+ $(ACE_ROOT)/ace/Module.cpp \
+ $(ACE_ROOT)/ace/Stream_Modules.h \
+ $(ACE_ROOT)/ace/Stream_Modules.cpp
+
# IF YOU PUT ANYTHING HERE IT WILL GO AWAY
diff --git a/tests/RMCast/RMCast_Membership_Test.cpp b/tests/RMCast/RMCast_Membership_Test.cpp
index f1e320b43ef..240382ba758 100644
--- a/tests/RMCast/RMCast_Membership_Test.cpp
+++ b/tests/RMCast/RMCast_Membership_Test.cpp
@@ -40,7 +40,7 @@ public:
return this->joined_;
}
//! Set the flag to remember if this proxy has joined the group or
- //! not.
+ //! not.
void joined (int j)
{
this->joined_ = j;
@@ -286,12 +286,12 @@ Tester::ack (ACE_RMCast::Ack &ack)
// ACE_DEBUG ((LM_DEBUG,
// "Received ack in Tester %d,%d\n",
- // ack.highest_in_sequence,
+ // ack.next_expected,
// ack.highest_received));
// Assume the lock is held, verify that the ack message satisfy the
// invariants...
- ACE_UINT32 highest_in_sequence;
+ ACE_UINT32 next_expected;
ACE_UINT32 highest_received;
int set = 0;
for (size_t i = 0; i != nproxy; ++i)
@@ -301,18 +301,18 @@ Tester::ack (ACE_RMCast::Ack &ack)
if (!set)
{
set = 1;
- highest_in_sequence = this->proxy_[i].highest_in_sequence ();
+ next_expected = this->proxy_[i].next_expected ();
highest_received = this->proxy_[i].highest_received ();
}
else
{
- if (highest_in_sequence >
- this->proxy_[i].highest_in_sequence ())
+ if (next_expected >
+ this->proxy_[i].next_expected ())
{
- highest_in_sequence =
- this->proxy_[i].highest_in_sequence ();
+ next_expected =
+ this->proxy_[i].next_expected ();
}
- if (highest_received <
+ if (highest_received <
this->proxy_[i].highest_received ())
{
highest_received =
@@ -325,10 +325,10 @@ Tester::ack (ACE_RMCast::Ack &ack)
return 0;
// Check the invariants
- if (ack.highest_in_sequence != highest_in_sequence)
+ if (ack.next_expected != next_expected)
{
ACE_ERROR_RETURN ((LM_ERROR,
- "Invalid highest_in_sequence in Ack\n"),
+ "Invalid next_expected in Ack\n"),
-1);
}
if (ack.highest_received != highest_received)
@@ -395,8 +395,8 @@ Tester::generate_acks (int iterations)
ACE_RMCast::Ack ack;
ack.source = &this->proxy_[i];
- ack.highest_in_sequence =
- this->proxy_[i].highest_in_sequence ();
+ ack.next_expected =
+ this->proxy_[i].next_expected ();
ack.highest_received =
this->proxy_[i].highest_received ();
@@ -415,13 +415,13 @@ Tester::generate_acks (int iterations)
ack.highest_received++;
break;
default:
- if (ack.highest_received > ack.highest_in_sequence)
- ack.highest_in_sequence++;
+ if (ack.highest_received > ack.next_expected)
+ ack.next_expected++;
break;
}
// ACE_DEBUG ((LM_DEBUG,
// "Sending ack message (%d,%d) through proxy %d\n",
- // ack.highest_in_sequence,
+ // ack.next_expected,
// ack.highest_received,
// i));
int result = this->proxy_[i].ack (ack);
diff --git a/tests/RMCast/RMCast_Retransmission_Test.cpp b/tests/RMCast/RMCast_Retransmission_Test.cpp
new file mode 100644
index 00000000000..a40c0fd4177
--- /dev/null
+++ b/tests/RMCast/RMCast_Retransmission_Test.cpp
@@ -0,0 +1,457 @@
+// $Id$
+
+#include "test_config.h"
+#include "ace/RMCast/RMCast_Proxy.h"
+#include "ace/RMCast/RMCast_Retransmission.h"
+
+#include "ace/Task.h"
+
+ACE_RCSID(tests, RMCast_Retransmission_Test, "$Id$")
+
+// ****************************************************************
+
+class Tester;
+
+//! Simple proxy for the ACE_RMCast_Retransmission test harness
+/*!
+ * Implement a simple version of the ACE_RMCast_Proxy class used in
+ * the ACE_RMCast_Retransmission test harness.
+ */
+class Test_Proxy : public ACE_RMCast_Proxy
+{
+public:
+ Test_Proxy (void);
+
+ void set_tester (Tester *tester)
+ {
+ this->tester_ = tester;
+ }
+
+ //! Get the flag to remember if this proxy has joined the group or
+ //! not.
+ int joined (void) const
+ {
+ return this->joined_;
+ }
+
+ //! Set the flag to remember if this proxy has joined the group or
+ //! not.
+ void joined (int j)
+ {
+ this->joined_ = j;
+ }
+
+ //! Most of the reply_ methods just return 0, there is no real remote
+ //! peer, this is just a test harness
+ //@{
+ virtual int reply_data (ACE_RMCast::Data &)
+ {
+ return 0;
+ }
+ virtual int reply_poll (ACE_RMCast::Poll &)
+ {
+ return 0;
+ }
+ //! Must check that that sequence number is reasonable
+ virtual int reply_ack_join (ACE_RMCast::Ack_Join &);
+ virtual int reply_ack_leave (ACE_RMCast::Ack_Leave &)
+ {
+ return 0;
+ }
+ virtual int reply_ack (ACE_RMCast::Ack &)
+ {
+ return 0;
+ }
+ virtual int reply_join (ACE_RMCast::Join &)
+ {
+ return 0;
+ }
+ virtual int reply_leave (ACE_RMCast::Leave &)
+ {
+ return 0;
+ }
+ //@}
+
+ int data (ACE_RMCast::Data &data);
+ int ack (ACE_RMCast::Ack &ack);
+
+private:
+ //! Remember if we joined the group already.
+ int joined_;
+
+ //! Keep a reference to the main testing class so it can be called
+ //! back.
+ Tester *tester_;
+
+ //! The test is randomized to get better coverage. This is the seed
+ //! variable for the test
+ ACE_RANDR_TYPE seed_;
+
+ //! Synchronize internal data structures.
+ ACE_SYNCH_MUTEX lock_;
+};
+
+// ****************************************************************
+
+//! The number of proxies used in the test
+/*!
+ * Not all member will be present in the group at the same time. But
+ * this variable controls the maximum number
+ */
+const size_t nproxy = 16;
+//! A simple module to receive the messages from ACE_RMCast_Retransmission
+/*!
+ * The ACE_RMCast_Retransmission layer pushes messages to its next module
+ * when all the members have acked a message, when a new member joins,
+ * when a member leaves, etc.
+ * This class will verify that the messages are exactly what we
+ * expect.
+ */
+class Tester : public ACE_RMCast_Module
+{
+public:
+ Tester (void);
+
+ //! Run the test for \iterations times
+ void run (int iterations);
+
+ //! One of the proxies has received an Ack_Join message, we need to
+ //! validate it
+ int reply_ack_join (Test_Proxy *proxy, ACE_RMCast::Ack_Join &ack_join);
+
+ //! One of the proxies has received an Ack message.
+ /*!
+ * In this method we simulate the role of the Membership layer,
+ * however we just do brute force instead of trying to optimize the
+ * Ack processing
+ */
+ int ack (Test_Proxy *proxy, ACE_RMCast::Ack &ack);
+
+ virtual int data (ACE_RMCast::Data &data);
+
+private:
+ //! Generate a new message to drive the test
+ void generate_one_message (void);
+
+ //! Send a single Ack that summarizes the state of all the proxies.
+ int send_ack (void);
+
+ //! Ask the retransmission layer to resend lost messages
+ int resend (void);
+
+private:
+ //! The array of proxies
+ Test_Proxy proxy_[nproxy];
+
+ //! The Retransmission layer
+ ACE_RMCast_Retransmission retransmission_;
+
+ //! Synchronize internal data structures
+ ACE_SYNCH_MUTEX lock_;
+
+ //! The test is randomized to get better coverage. This is the seed
+ //! variable for the test
+ ACE_RANDR_TYPE seed_;
+
+ //! Generate sequence numbers for the Retransmission layer
+ ACE_UINT32 sequence_number_generator_;
+};
+
+// ****************************************************************
+
+//! An Adapter to run Tester::run the test is a separate thread
+class Task : public ACE_Task_Base
+{
+public:
+ Task (Tester *tester);
+
+ // = Read the documentation in "ace/Task.h"
+ int svc (void);
+
+private:
+ //! The tester object.
+ Tester *tester_;
+};
+
+// ****************************************************************
+
+int
+main (int, ACE_TCHAR *[])
+{
+ ACE_START_TEST (ACE_TEXT ("RMCast_Retransmission_Test"));
+
+ ACE_DEBUG ((LM_DEBUG,
+ ACE_TEXT ("This is ACE Version %u.%u.%u\n\n"),
+ ACE::major_version(),
+ ACE::minor_version(),
+ ACE::beta_version()));
+
+ {
+ ACE_DEBUG ((LM_DEBUG, "Running single threaded test\n"));
+ //! Run the test in single threaded mode
+ Tester tester;
+ tester.run (100);
+ }
+ {
+ ACE_DEBUG ((LM_DEBUG, "Running multi threaded test\n"));
+ //! Run the test in multi-threaded mode
+ Tester tester;
+ Task task (&tester);
+ if (task.activate (THR_NEW_LWP|THR_JOINABLE, 4) == -1)
+ ACE_ERROR_RETURN ((LM_ERROR, "Cannot activate the threads\n"), 1);
+ ACE_Thread_Manager::instance ()->wait ();
+ }
+
+ ACE_END_TEST;
+ return 0;
+}
+
+// ****************************************************************
+
+Tester::Tester (void)
+ : seed_ (ACE_OS::gethrtime ())
+ , sequence_number_generator_ (0)
+{
+ // Initialize the stack...
+ this->retransmission_.next (this);
+
+ for (size_t i = 0; i != nproxy; ++i)
+ {
+ this->proxy_[i].set_tester (this);
+ this->proxy_[i].joined (1);
+ }
+}
+
+void
+Tester::run (int iterations)
+{
+ for (int i = 0; i < iterations; ++i)
+ {
+ // Push data
+ this->generate_one_message ();
+
+ this->send_ack ();
+
+ // Re-send unacked data
+ if (i % 5 == 0)
+ {
+ this->resend ();
+ this->send_ack ();
+ }
+ }
+
+ while (this->resend () != 0)
+ {
+ this->send_ack ();
+ }
+}
+
+int
+Tester::ack (Test_Proxy *, ACE_RMCast::Ack &)
+{
+ return this->send_ack ();
+}
+
+int
+Tester::send_ack ()
+{
+ ACE_RMCast::Ack ack;
+
+ int set = 0;
+ for (size_t i = 0; i != nproxy; ++i)
+ {
+ if (this->proxy_[i].joined () == 0)
+ continue;
+ if (!set)
+ {
+ ack.next_expected = this->proxy_[i].next_expected ();
+ ack.highest_received = this->proxy_[i].highest_received ();
+ set = 1;
+ }
+ else
+ {
+ if (ack.next_expected > this->proxy_[i].next_expected ())
+ ack.next_expected = this->proxy_[i].next_expected ();
+ if (ack.highest_received < this->proxy_[i].highest_received ())
+ ack.highest_received = this->proxy_[i].highest_received ();
+ }
+ }
+ if (!set)
+ return 0;
+ ACE_DEBUG ((LM_DEBUG, "Tested::ack - (%d,%d)\n",
+ ack.next_expected, ack.highest_received));
+ return this->retransmission_.ack (ack);
+}
+
+void
+Tester::generate_one_message (void)
+{
+ ACE_Message_Block payload (1024);
+ payload.wr_ptr (1024);
+
+ ACE_RMCast::Data data;
+ data.payload = &payload;
+ data.sequence_number = -1;
+
+ int result = this->retransmission_.data (data);
+ if (result != 0)
+ {
+ ACE_ERROR ((LM_ERROR,
+ "Retransmission::data returned %d\n",
+ result));
+ }
+}
+
+int
+Tester::resend (void)
+{
+ ACE_UINT32 max_sequence_number =
+ this->sequence_number_generator_;
+ int r = this->retransmission_.resend (max_sequence_number);
+ if (r == -1)
+ {
+ ACE_DEBUG ((LM_DEBUG,
+ "Error returned from Retransmission::resend\n"));
+ }
+ return r;
+}
+
+int
+Tester::data (ACE_RMCast::Data &data)
+{
+ // After going through the Retransmission layer we got some data,
+ // simulate the work of the following layers:
+ // - Fragmentation: assing message sequence numbers
+ // - IO_XXX: send to all known members
+ // - Reassembly: reconstruct the message on the receiving side.
+
+ if (data.sequence_number == -1)
+ {
+ data.total_size = 1024;
+ data.fragment_offset = 0;
+ ACE_GUARD_RETURN (ACE_SYNCH_MUTEX, ace_mon, this->lock_, -1);
+ data.sequence_number = this->sequence_number_generator_++;
+ }
+ for (size_t i = 0; i != nproxy; ++i)
+ {
+ int result = this->proxy_[i].data (data);
+ if (result != 0)
+ {
+ ACE_ERROR ((LM_ERROR,
+ "Proxy::data returned %d for proxy %d\n",
+ result, i));
+ return -1;
+ }
+ }
+ return 0;
+}
+
+// ****************************************************************
+
+Task::Task (Tester *tester)
+ : tester_ (tester)
+{
+}
+
+int
+Task::svc (void)
+{
+ this->tester_->run (100);
+ return 0;
+}
+
+// ****************************************************************
+
+Test_Proxy::Test_Proxy (void)
+ : joined_ (0)
+ , tester_ (0)
+ , seed_ (ACE_static_cast(ACE_RANDR_TYPE,ACE_OS::gethrtime ()))
+{
+}
+
+int
+Test_Proxy::data (ACE_RMCast::Data &data)
+{
+ ACE_DEBUG ((LM_DEBUG, " (%t) Proxy receives message %d\n",
+ data.sequence_number));
+ ACE_GUARD_RETURN (ACE_SYNCH_MUTEX, ace_mon, this->lock_, -1);
+
+ int c = ACE_OS::rand_r (this->seed_) % 100;
+
+ const int success_ratio = 95;
+
+ if (this->next_expected () > data.sequence_number)
+ {
+ // An old message, resend the ack...
+ ACE_RMCast::Ack ack;
+ ack.next_expected = this->next_expected ();
+ ack.highest_received = this->highest_received ();
+ ACE_DEBUG ((LM_DEBUG, "....it is an already accepted message\n"));
+ // Ack the message....
+ return this->ack (ack);
+ }
+
+ if (this->next_expected () == data.sequence_number)
+ {
+ // The message is the next one that we expected...
+ // Two choices: accept the message as successfully received or
+ // reject it, we accept them most of the time.
+ if (c > success_ratio)
+ {
+ // We ignore the message completely as if it was lost in the
+ // network
+ ACE_DEBUG ((LM_DEBUG, "....and drops it\n"));
+ return 0;
+ }
+
+ ACE_RMCast::Ack ack;
+ ack.next_expected = data.sequence_number + 1;
+ if (ack.next_expected < this->highest_received ())
+ {
+ ack.highest_received = this->highest_received ();
+ }
+ else
+ {
+ ack.highest_received = data.sequence_number;
+ }
+ ACE_DEBUG ((LM_DEBUG, "....and accepts it\n"));
+ // Ack the message....
+ return this->ack (ack);
+ }
+
+ ACE_DEBUG ((LM_DEBUG, "....the message is out of order\n"));
+ // This is an out of sequence number, maybe it is lost...
+ if (c > success_ratio)
+ {
+ ACE_DEBUG ((LM_DEBUG, "........and is dropped\n"));
+ return 0;
+ }
+
+ ACE_DEBUG ((LM_DEBUG, "........and is accepted\n"));
+ ACE_RMCast::Ack ack;
+ ack.next_expected = this->next_expected ();
+ if (data.sequence_number < this->highest_received ())
+ {
+ ack.highest_received = this->highest_received ();
+ }
+ else
+ {
+ ack.highest_received = data.sequence_number;
+ }
+ // Ack the message....
+ return this->ack (ack);
+}
+
+int
+Test_Proxy::ack (ACE_RMCast::Ack &ack)
+{
+ (void) this->ACE_RMCast_Proxy::ack (ack);
+ (void) this->tester_->ack (this, ack);
+ return 0;
+}
+
+int
+Test_Proxy::reply_ack_join (ACE_RMCast::Ack_Join & /* ack_join */)
+{
+ return 0; // this->tester_->proxy_reply_ack_join (this, ack_join);
+}
diff --git a/tests/RMCast/RMCast_Retransmission_Test.dsp b/tests/RMCast/RMCast_Retransmission_Test.dsp
new file mode 100644
index 00000000000..b60a4a9c677
--- /dev/null
+++ b/tests/RMCast/RMCast_Retransmission_Test.dsp
@@ -0,0 +1,96 @@
+# Microsoft Developer Studio Project File - Name="RMCast_Retransmission_Test" - Package Owner=<4>
+# Microsoft Developer Studio Generated Build File, Format Version 6.00
+# ** DO NOT EDIT **
+
+# TARGTYPE "Win32 (x86) Console Application" 0x0103
+
+CFG=RMCast_Retransmission_Test - Win32 Debug
+!MESSAGE This is not a valid makefile. To build this project using NMAKE,
+!MESSAGE use the Export Makefile command and run
+!MESSAGE
+!MESSAGE NMAKE /f "RMCast_Retransmission_Test.mak".
+!MESSAGE
+!MESSAGE You can specify a configuration when running NMAKE
+!MESSAGE by defining the macro CFG on the command line. For example:
+!MESSAGE
+!MESSAGE NMAKE /f "RMCast_Retransmission_Test.mak" CFG="RMCast_Retransmission_Test - Win32 Debug"
+!MESSAGE
+!MESSAGE Possible choices for configuration are:
+!MESSAGE
+!MESSAGE "RMCast_Retransmission_Test - Win32 Release" (based on "Win32 (x86) Console Application")
+!MESSAGE "RMCast_Retransmission_Test - Win32 Debug" (based on "Win32 (x86) Console Application")
+!MESSAGE
+
+# Begin Project
+# PROP AllowPerConfigDependencies 0
+# PROP Scc_ProjName ""
+# PROP Scc_LocalPath ""
+CPP=cl.exe
+RSC=rc.exe
+
+!IF "$(CFG)" == "RMCast_Retransmission_Test - Win32 Release"
+
+# PROP BASE Use_MFC 0
+# PROP BASE Use_Debug_Libraries 0
+# PROP BASE Output_Dir "Release"
+# PROP BASE Intermediate_Dir "Release"
+# PROP BASE Target_Dir ""
+# PROP Use_MFC 0
+# PROP Use_Debug_Libraries 0
+# PROP Output_Dir "Release"
+# PROP Intermediate_Dir "Release"
+# PROP Ignore_Export_Lib 0
+# PROP Target_Dir ""
+# ADD BASE CPP /nologo /W3 /GX /O2 /D "WIN32" /D "NDEBUG" /D "_CONSOLE" /D "_MBCS" /YX /FD /c
+# ADD CPP /nologo /MD /W3 /GX /O2 /I "..\.." /I ".." /D "WIN32" /D "NDEBUG" /D "_CONSOLE" /D "_MBCS" /FD /c
+# SUBTRACT CPP /YX
+# ADD BASE RSC /l 0x409 /d "NDEBUG"
+# ADD RSC /l 0x409 /d "NDEBUG"
+BSC32=bscmake.exe
+# ADD BASE BSC32 /nologo
+# ADD BSC32 /nologo
+LINK32=link.exe
+# ADD BASE LINK32 kernel32.lib user32.lib gdi32.lib winspool.lib comdlg32.lib advapi32.lib shell32.lib ole32.lib oleaut32.lib uuid.lib odbc32.lib odbccp32.lib kernel32.lib user32.lib gdi32.lib winspool.lib comdlg32.lib advapi32.lib shell32.lib ole32.lib oleaut32.lib uuid.lib odbc32.lib odbccp32.lib /nologo /subsystem:console /machine:I386
+# ADD LINK32 ace.lib ACE_RMCast.lib /nologo /subsystem:console /machine:I386 /libpath:"..\..\ace" /libpath:"..\..\ace\RMCast"
+
+!ELSEIF "$(CFG)" == "RMCast_Retransmission_Test - Win32 Debug"
+
+# PROP BASE Use_MFC 0
+# PROP BASE Use_Debug_Libraries 1
+# PROP BASE Output_Dir "Debug"
+# PROP BASE Intermediate_Dir "Debug"
+# PROP BASE Target_Dir ""
+# PROP Use_MFC 0
+# PROP Use_Debug_Libraries 1
+# PROP Output_Dir "Debug"
+# PROP Intermediate_Dir "Debug"
+# PROP Ignore_Export_Lib 0
+# PROP Target_Dir ""
+# ADD BASE CPP /nologo /W3 /Gm /GX /Zi /Od /D "WIN32" /D "_DEBUG" /D "_CONSOLE" /D "_MBCS" /YX /FD /c
+# ADD CPP /nologo /MDd /W3 /Gm /GX /Zi /Od /I "..\.." /I ".." /D "WIN32" /D "_DEBUG" /D "_CONSOLE" /D "_MBCS" /FD /c
+# SUBTRACT CPP /YX
+# ADD BASE RSC /l 0x409 /d "_DEBUG"
+# ADD RSC /l 0x409 /d "_DEBUG"
+BSC32=bscmake.exe
+# ADD BASE BSC32 /nologo
+# ADD BSC32 /nologo
+LINK32=link.exe
+# ADD BASE LINK32 kernel32.lib user32.lib gdi32.lib winspool.lib comdlg32.lib advapi32.lib shell32.lib ole32.lib oleaut32.lib uuid.lib odbc32.lib odbccp32.lib kernel32.lib user32.lib gdi32.lib winspool.lib comdlg32.lib advapi32.lib shell32.lib ole32.lib oleaut32.lib uuid.lib odbc32.lib odbccp32.lib /nologo /subsystem:console /debug /machine:I386 /pdbtype:sept
+# ADD LINK32 ACE_RMCastd.lib aced.lib /nologo /subsystem:console /debug /machine:I386 /out:"RMCast_Retransmission_Test.exe" /pdbtype:sept /libpath:"..\..\ace" /libpath:"..\..\ace\RMCast"
+
+!ENDIF
+
+# Begin Target
+
+# Name "RMCast_Retransmission_Test - Win32 Release"
+# Name "RMCast_Retransmission_Test - Win32 Debug"
+# Begin Group "Source Files"
+
+# PROP Default_Filter "cpp;c;cxx;rc;def;r;odl;idl;hpj;bat"
+# Begin Source File
+
+SOURCE=.\RMCast_Retransmission_Test.cpp
+# End Source File
+# End Group
+# End Target
+# End Project
diff --git a/tests/RMCast/RMCast_Tests.dsw b/tests/RMCast/RMCast_Tests.dsw
index 5dc90ff201b..e9e6820ccc2 100644
--- a/tests/RMCast/RMCast_Tests.dsw
+++ b/tests/RMCast/RMCast_Tests.dsw
@@ -39,6 +39,18 @@ Package=<4>
###############################################################################
+Project: "RMCast_Retransmission_Test"=.\RMCast_Retransmission_Test.dsp - Package Owner=<4>
+
+Package=<5>
+{{{
+}}}
+
+Package=<4>
+{{{
+}}}
+
+###############################################################################
+
Project: "RMCast_UDP_Best_Effort_Test"=.\RMCast_UDP_Best_Effort_Test.dsp - Package Owner=<4>
Package=<5>