diff options
author | coryan <coryan@ae88bc3d-4319-0410-8dbf-d08b4c9d3795> | 2000-10-02 18:40:06 +0000 |
---|---|---|
committer | coryan <coryan@ae88bc3d-4319-0410-8dbf-d08b4c9d3795> | 2000-10-02 18:40:06 +0000 |
commit | 3c7edb739b596313a3e15fe1373bd488c2f37009 (patch) | |
tree | ec3f3e0df41c8f76d9c1ae3d472221e78a23d61f | |
parent | 6c8cdfe85f70b9be20d1dd80f9730dae491ff403 (diff) | |
download | ATCD-3c7edb739b596313a3e15fe1373bd488c2f37009.tar.gz |
ChangeLogTag:Mon Oct 2 11:29:47 2000 Carlos O'Ryan <coryan@uci.edu>
57 files changed, 2842 insertions, 596 deletions
diff --git a/ChangeLog b/ChangeLog index 549d519a4e6..0b20947f8fd 100644 --- a/ChangeLog +++ b/ChangeLog @@ -1,13 +1,68 @@ +Mon Oct 2 11:29:47 2000 Carlos O'Ryan <coryan@uci.edu> + + * ace/RMCast/RMCast_Retransmission.h: + * ace/RMCast/RMCast_Retransmission.cpp: + Fixed several minor problems in the Retransmission module. + Use Copy_On_Write dispatching for synchronization of the + internal message buffer. + + * ace/RMCast/RMCast_Copy_On_Write.h: + * ace/RMCast/RMCast_Copy_On_Write.i: + * ace/RMCast/RMCast_Copy_On_Write.cpp: + Implement Copy_On_Write semantics for a collection like the + retransmission buffer. This is based on the ESF_Copy_On_Write + strategy developed for the event channel. + We should try to refactor this one in a reusable strategy. + + * ace/RMCast/RMCast_Worker.h: + * ace/RMCast/RMCast_Worker.i: + * ace/RMCast/RMCast_Worker.cpp: + To implement copy-on-write we need the "alternative" form of + iterators. + + * ace/RMCast/RMCast.h: + * ace/RMCast/RMCast_Fragment.h: + * ace/RMCast/RMCast_IO_UDP.h: + * ace/RMCast/RMCast_IO_UDP.cpp: + * ace/RMCast/RMCast_Membership.h: + * ace/RMCast/RMCast_Membership.i: + * ace/RMCast/RMCast_Membership.cpp: + * ace/RMCast/RMCast_Module.h: + * ace/RMCast/RMCast_Module_Factory.h: + * ace/RMCast/RMCast_Partial_Message.h: + * ace/RMCast/RMCast_Proxy.h: + * ace/RMCast/RMCast_Proxy.i: + * ace/RMCast/RMCast_Proxy.cpp: + * ace/RMCast/RMCast_UDP_Event_Handler.h: + * ace/RMCast/RMCast_UDP_Proxy.cpp: + * tests/RMCast/RMCast_Membership_Test.cpp: + Made the comments more doxygen friendly. + Renamed the highest_in_sequence field to next_expected, the new + name really reflects its semantics, this is the next sequence + number that the peer is expecting. All the previous numbers + have been either received or are simply assumed lost. + + * tests/Makefile: + If the rmcast makefile flag is set to 1 we build the RMCast + subdirectory too. + + * tests/RMCast/Makefile: + * tests/RMCast/RMCast_Tests.dsw: + * tests/RMCast/RMCast_Retransmission_Test.dsp: + * tests/RMCast/RMCast_Retransmission_Test.cpp: + New test for the Retransmission module + + Sun Oct 01 15:50:42 2000 Darrell Brunsch <brunsch@uci.edu> * bin/auto_run_tests.pl: Added support for a sandbox program that can shutdown a test - if it hangs. + if it hangs. Fri Sep 29 16:32:22 2000 Darrell Brunsch <brunsch@uci.edu> - * bin/msvc_auto_compile.pl: + * bin/msvc_auto_compile.pl: Added the BE and FE projects for TAO_IDL so the static version gets built correctly in auto_compiles. @@ -24,123 +79,123 @@ Fri Sep 29 16:17:34 2000 Steve Huston <shuston@riverace.com> Thu Sep 28 22:06:44 2000 Ossama Othman <ossama@uci.edu> - * ace/SSL/SSL_SOCK_Connector.h: - * ace/SSL/SSL_SOCK_Connector.cpp (ACE_SSL_SOCK_Connector, - shared_connect_start, shared_connect_finish, connect, complete): + * ace/SSL/SSL_SOCK_Connector.h: + * ace/SSL/SSL_SOCK_Connector.cpp (ACE_SSL_SOCK_Connector, + shared_connect_start, shared_connect_finish, connect, complete): - Made ACE_Time_Value arguments const to match the changes - detailed in: + Made ACE_Time_Value arguments const to match the changes + detailed in: - Thu Sep 28 15:02:36 2000 Irfan Pyarali <irfan@cs.wustl.edu> + Thu Sep 28 15:02:36 2000 Irfan Pyarali <irfan@cs.wustl.edu> Fri Sep 29 00:23:59 2000 Irfan Pyarali <irfan@cs.wustl.edu> - * ace/Log_Msg.h (ACE_RETURN): Fixed minor error. + * ace/Log_Msg.h (ACE_RETURN): Fixed minor error. Thu Sep 28 22:13:04 2000 Irfan Pyarali <irfan@cs.wustl.edu> - * ace/Log_Msg: Macros are evil! All the ACE debugging macros were - evaluating the user arguments twice, once in - log_priority_enabled() and the other in log(). The problem was - that set() was making a deep copy of the filename. Hence, this - change: + * ace/Log_Msg: Macros are evil! All the ACE debugging macros were + evaluating the user arguments twice, once in + log_priority_enabled() and the other in log(). The problem was + that set() was making a deep copy of the filename. Hence, this + change: - Wed May 3 11:43:05 2000 Douglas C. Schmidt <schmidt@ace.cs.wustl.edu> + Wed May 3 11:43:05 2000 Douglas C. Schmidt <schmidt@ace.cs.wustl.edu> - was made to prevent the deep copy which was unnecessary when the - message was not actually logged. However, this resulted in the - macro evaluating the user arguments twice. Previously, a - statement like this: + was made to prevent the deep copy which was unnecessary when the + message was not actually logged. However, this resulted in the + macro evaluating the user arguments twice. Previously, a + statement like this: ACE_DEBUG ((LM_DEBUG, "timeout occured, iterations left %d\n", --iterations)); - <iterations> got reduced by one - now it got reduced by two ;-) + <iterations> got reduced by one - now it got reduced by two ;-) - The solution was to make a shallow copy of the filename in - conditional_set(). Then in log(), if the log priority is - correct, make a deep copy and then continue will log(). The - macros were changed to call conditional_set() instead of set(). + The solution was to make a shallow copy of the filename in + conditional_set(). Then in log(), if the log priority is + correct, make a deep copy and then continue will log(). The + macros were changed to call conditional_set() instead of set(). - Also, changed ACE_RETURN to specify all the parameters, - including <restart>, <callback>, and <stream>. Otherwise, the - default parameters of set() will end up losing these parameters - set by the user. + Also, changed ACE_RETURN to specify all the parameters, + including <restart>, <callback>, and <stream>. Otherwise, the + default parameters of set() will end up losing these parameters + set by the user. Thu Sep 28 15:49:00 2000 Douglas C. Schmidt <schmidt@ace.cs.wustl.edu> - * ace/Stream.h (class ACE_Stream): Updated the documentation to clarify - that the ACE_Time_Value's are *absolute* time. Thanks to Pedro - for reporting this. Thanks to Pedro Brandao - <pbrandao@inescn.pt> for reporting this. + * ace/Stream.h (class ACE_Stream): Updated the documentation to clarify + that the ACE_Time_Value's are *absolute* time. Thanks to Pedro + for reporting this. Thanks to Pedro Brandao + <pbrandao@inescn.pt> for reporting this. Thu Sep 28 15:02:36 2000 Irfan Pyarali <irfan@cs.wustl.edu> - * ace/SOCK_Connector.cpp: ACE_Time_Value args to - ACE_SOCK_Connector could be const. Had to patch functions in - the following files to make it happen: + * ace/SOCK_Connector.cpp: ACE_Time_Value args to + ACE_SOCK_Connector could be const. Had to patch functions in + the following files to make it happen: - ACE.cpp - ACE.h - OS.h - OS.i - SOCK_Connector.cpp - SOCK_Connector.h + ACE.cpp + ACE.h + OS.h + OS.i + SOCK_Connector.cpp + SOCK_Connector.h - Thanks to Steve Huston <shuston@riverace.com> for reporting - this. This also fixes bug 673. + Thanks to Steve Huston <shuston@riverace.com> for reporting + this. This also fixes bug 673. Thu Sep 28 11:14:29 2000 Martin Stack <mstack@cambertx.com> - * ace/config-freebsd.h: - * ace/config-freebsd-pthread.h: - * ace/config-linux-common.h: - * ace/config-irix6.x-common.h: Added/Renamed to - ACE_USES_NEW_TERMIOS_STRUCT. + * ace/config-freebsd.h: + * ace/config-freebsd-pthread.h: + * ace/config-linux-common.h: + * ace/config-irix6.x-common.h: Added/Renamed to + ACE_USES_NEW_TERMIOS_STRUCT. - * ace/TTY_IO.h: - * ace/TTY_IO.cpp: The Win32 section was modified to implement a - non-blocking read when read-timeout=0 is set. Also, it was - modified to ensure proper operations when a read_timeout is - required. + * ace/TTY_IO.h: + * ace/TTY_IO.cpp: The Win32 section was modified to implement a + non-blocking read when read-timeout=0 is set. Also, it was + modified to ensure proper operations when a read_timeout is + required. - Code was added to enable the DTR line on both Win32 and unix - platforms when the port is opened. + Code was added to enable the DTR line on both Win32 and unix + platforms when the port is opened. - Several new flags where added to give proper access to the - serial device. + Several new flags where added to give proper access to the + serial device. - Relabled macro "ACE_USES_OLD_TERMIOS_STRUCT" to - "ACE_USES_NEW_TERMIOS_STRUCT" to properly indicate its purpose. + Relabled macro "ACE_USES_OLD_TERMIOS_STRUCT" to + "ACE_USES_NEW_TERMIOS_STRUCT" to properly indicate its purpose. Thu Sep 28 09:01:19 2000 Ossama Othman <ossama@uci.edu> - * ace/config-g++-common.h: + * ace/config-g++-common.h: - Reverted my g++ 2.95 updates. They work on all platforms but - the cross-compiler used for VxWorks. + Reverted my g++ 2.95 updates. They work on all platforms but + the cross-compiler used for VxWorks. Wed Sep 27 16:17:36 2000 Ossama Othman <ossama@uci.edu> - * ace/IOStream.h: - * ace/IOStream_T.h: + * ace/IOStream.h: + * ace/IOStream_T.h: - Moved inclusion of `ace/INET_Addr.h' and `ace/Handle_Set.h' to - `IOStream_T.h'. They weren't needed in `IOStream.h'. + Moved inclusion of `ace/INET_Addr.h' and `ace/Handle_Set.h' to + `IOStream_T.h'. They weren't needed in `IOStream.h'. - * ace/config-g++-common.h (ACE_LACKS_AUTO_PTR, - ACE_LACKS_STATIC_DATA_MEMBER_TEMPLATES, - ACE_HAS_GNUC_BROKEN_TEMPLATE_INLINE_FUNCTIONS): + * ace/config-g++-common.h (ACE_LACKS_AUTO_PTR, + ACE_LACKS_STATIC_DATA_MEMBER_TEMPLATES, + ACE_HAS_GNUC_BROKEN_TEMPLATE_INLINE_FUNCTIONS): - G++ 2.95.x properly support the auto_ptr class, templates with - static data members, and inlined template functions. + G++ 2.95.x properly support the auto_ptr class, templates with + static data members, and inlined template functions. Wed Sep 27 14:02:30 2000 Irfan Pyarali <irfan@cs.wustl.edu> - * examples/Reactor/WFMO_Reactor/test_abandoned.cpp - (handle_timeout): Moved <--this->iterations_> outside the DEBUG - statement. + * examples/Reactor/WFMO_Reactor/test_abandoned.cpp + (handle_timeout): Moved <--this->iterations_> outside the DEBUG + statement. Wed Sep 27 08:46:12 2000 Carlos O'Ryan <coryan@uci.edu> diff --git a/ChangeLogs/ChangeLog-02a b/ChangeLogs/ChangeLog-02a index 549d519a4e6..0b20947f8fd 100644 --- a/ChangeLogs/ChangeLog-02a +++ b/ChangeLogs/ChangeLog-02a @@ -1,13 +1,68 @@ +Mon Oct 2 11:29:47 2000 Carlos O'Ryan <coryan@uci.edu> + + * ace/RMCast/RMCast_Retransmission.h: + * ace/RMCast/RMCast_Retransmission.cpp: + Fixed several minor problems in the Retransmission module. + Use Copy_On_Write dispatching for synchronization of the + internal message buffer. + + * ace/RMCast/RMCast_Copy_On_Write.h: + * ace/RMCast/RMCast_Copy_On_Write.i: + * ace/RMCast/RMCast_Copy_On_Write.cpp: + Implement Copy_On_Write semantics for a collection like the + retransmission buffer. This is based on the ESF_Copy_On_Write + strategy developed for the event channel. + We should try to refactor this one in a reusable strategy. + + * ace/RMCast/RMCast_Worker.h: + * ace/RMCast/RMCast_Worker.i: + * ace/RMCast/RMCast_Worker.cpp: + To implement copy-on-write we need the "alternative" form of + iterators. + + * ace/RMCast/RMCast.h: + * ace/RMCast/RMCast_Fragment.h: + * ace/RMCast/RMCast_IO_UDP.h: + * ace/RMCast/RMCast_IO_UDP.cpp: + * ace/RMCast/RMCast_Membership.h: + * ace/RMCast/RMCast_Membership.i: + * ace/RMCast/RMCast_Membership.cpp: + * ace/RMCast/RMCast_Module.h: + * ace/RMCast/RMCast_Module_Factory.h: + * ace/RMCast/RMCast_Partial_Message.h: + * ace/RMCast/RMCast_Proxy.h: + * ace/RMCast/RMCast_Proxy.i: + * ace/RMCast/RMCast_Proxy.cpp: + * ace/RMCast/RMCast_UDP_Event_Handler.h: + * ace/RMCast/RMCast_UDP_Proxy.cpp: + * tests/RMCast/RMCast_Membership_Test.cpp: + Made the comments more doxygen friendly. + Renamed the highest_in_sequence field to next_expected, the new + name really reflects its semantics, this is the next sequence + number that the peer is expecting. All the previous numbers + have been either received or are simply assumed lost. + + * tests/Makefile: + If the rmcast makefile flag is set to 1 we build the RMCast + subdirectory too. + + * tests/RMCast/Makefile: + * tests/RMCast/RMCast_Tests.dsw: + * tests/RMCast/RMCast_Retransmission_Test.dsp: + * tests/RMCast/RMCast_Retransmission_Test.cpp: + New test for the Retransmission module + + Sun Oct 01 15:50:42 2000 Darrell Brunsch <brunsch@uci.edu> * bin/auto_run_tests.pl: Added support for a sandbox program that can shutdown a test - if it hangs. + if it hangs. Fri Sep 29 16:32:22 2000 Darrell Brunsch <brunsch@uci.edu> - * bin/msvc_auto_compile.pl: + * bin/msvc_auto_compile.pl: Added the BE and FE projects for TAO_IDL so the static version gets built correctly in auto_compiles. @@ -24,123 +79,123 @@ Fri Sep 29 16:17:34 2000 Steve Huston <shuston@riverace.com> Thu Sep 28 22:06:44 2000 Ossama Othman <ossama@uci.edu> - * ace/SSL/SSL_SOCK_Connector.h: - * ace/SSL/SSL_SOCK_Connector.cpp (ACE_SSL_SOCK_Connector, - shared_connect_start, shared_connect_finish, connect, complete): + * ace/SSL/SSL_SOCK_Connector.h: + * ace/SSL/SSL_SOCK_Connector.cpp (ACE_SSL_SOCK_Connector, + shared_connect_start, shared_connect_finish, connect, complete): - Made ACE_Time_Value arguments const to match the changes - detailed in: + Made ACE_Time_Value arguments const to match the changes + detailed in: - Thu Sep 28 15:02:36 2000 Irfan Pyarali <irfan@cs.wustl.edu> + Thu Sep 28 15:02:36 2000 Irfan Pyarali <irfan@cs.wustl.edu> Fri Sep 29 00:23:59 2000 Irfan Pyarali <irfan@cs.wustl.edu> - * ace/Log_Msg.h (ACE_RETURN): Fixed minor error. + * ace/Log_Msg.h (ACE_RETURN): Fixed minor error. Thu Sep 28 22:13:04 2000 Irfan Pyarali <irfan@cs.wustl.edu> - * ace/Log_Msg: Macros are evil! All the ACE debugging macros were - evaluating the user arguments twice, once in - log_priority_enabled() and the other in log(). The problem was - that set() was making a deep copy of the filename. Hence, this - change: + * ace/Log_Msg: Macros are evil! All the ACE debugging macros were + evaluating the user arguments twice, once in + log_priority_enabled() and the other in log(). The problem was + that set() was making a deep copy of the filename. Hence, this + change: - Wed May 3 11:43:05 2000 Douglas C. Schmidt <schmidt@ace.cs.wustl.edu> + Wed May 3 11:43:05 2000 Douglas C. Schmidt <schmidt@ace.cs.wustl.edu> - was made to prevent the deep copy which was unnecessary when the - message was not actually logged. However, this resulted in the - macro evaluating the user arguments twice. Previously, a - statement like this: + was made to prevent the deep copy which was unnecessary when the + message was not actually logged. However, this resulted in the + macro evaluating the user arguments twice. Previously, a + statement like this: ACE_DEBUG ((LM_DEBUG, "timeout occured, iterations left %d\n", --iterations)); - <iterations> got reduced by one - now it got reduced by two ;-) + <iterations> got reduced by one - now it got reduced by two ;-) - The solution was to make a shallow copy of the filename in - conditional_set(). Then in log(), if the log priority is - correct, make a deep copy and then continue will log(). The - macros were changed to call conditional_set() instead of set(). + The solution was to make a shallow copy of the filename in + conditional_set(). Then in log(), if the log priority is + correct, make a deep copy and then continue will log(). The + macros were changed to call conditional_set() instead of set(). - Also, changed ACE_RETURN to specify all the parameters, - including <restart>, <callback>, and <stream>. Otherwise, the - default parameters of set() will end up losing these parameters - set by the user. + Also, changed ACE_RETURN to specify all the parameters, + including <restart>, <callback>, and <stream>. Otherwise, the + default parameters of set() will end up losing these parameters + set by the user. Thu Sep 28 15:49:00 2000 Douglas C. Schmidt <schmidt@ace.cs.wustl.edu> - * ace/Stream.h (class ACE_Stream): Updated the documentation to clarify - that the ACE_Time_Value's are *absolute* time. Thanks to Pedro - for reporting this. Thanks to Pedro Brandao - <pbrandao@inescn.pt> for reporting this. + * ace/Stream.h (class ACE_Stream): Updated the documentation to clarify + that the ACE_Time_Value's are *absolute* time. Thanks to Pedro + for reporting this. Thanks to Pedro Brandao + <pbrandao@inescn.pt> for reporting this. Thu Sep 28 15:02:36 2000 Irfan Pyarali <irfan@cs.wustl.edu> - * ace/SOCK_Connector.cpp: ACE_Time_Value args to - ACE_SOCK_Connector could be const. Had to patch functions in - the following files to make it happen: + * ace/SOCK_Connector.cpp: ACE_Time_Value args to + ACE_SOCK_Connector could be const. Had to patch functions in + the following files to make it happen: - ACE.cpp - ACE.h - OS.h - OS.i - SOCK_Connector.cpp - SOCK_Connector.h + ACE.cpp + ACE.h + OS.h + OS.i + SOCK_Connector.cpp + SOCK_Connector.h - Thanks to Steve Huston <shuston@riverace.com> for reporting - this. This also fixes bug 673. + Thanks to Steve Huston <shuston@riverace.com> for reporting + this. This also fixes bug 673. Thu Sep 28 11:14:29 2000 Martin Stack <mstack@cambertx.com> - * ace/config-freebsd.h: - * ace/config-freebsd-pthread.h: - * ace/config-linux-common.h: - * ace/config-irix6.x-common.h: Added/Renamed to - ACE_USES_NEW_TERMIOS_STRUCT. + * ace/config-freebsd.h: + * ace/config-freebsd-pthread.h: + * ace/config-linux-common.h: + * ace/config-irix6.x-common.h: Added/Renamed to + ACE_USES_NEW_TERMIOS_STRUCT. - * ace/TTY_IO.h: - * ace/TTY_IO.cpp: The Win32 section was modified to implement a - non-blocking read when read-timeout=0 is set. Also, it was - modified to ensure proper operations when a read_timeout is - required. + * ace/TTY_IO.h: + * ace/TTY_IO.cpp: The Win32 section was modified to implement a + non-blocking read when read-timeout=0 is set. Also, it was + modified to ensure proper operations when a read_timeout is + required. - Code was added to enable the DTR line on both Win32 and unix - platforms when the port is opened. + Code was added to enable the DTR line on both Win32 and unix + platforms when the port is opened. - Several new flags where added to give proper access to the - serial device. + Several new flags where added to give proper access to the + serial device. - Relabled macro "ACE_USES_OLD_TERMIOS_STRUCT" to - "ACE_USES_NEW_TERMIOS_STRUCT" to properly indicate its purpose. + Relabled macro "ACE_USES_OLD_TERMIOS_STRUCT" to + "ACE_USES_NEW_TERMIOS_STRUCT" to properly indicate its purpose. Thu Sep 28 09:01:19 2000 Ossama Othman <ossama@uci.edu> - * ace/config-g++-common.h: + * ace/config-g++-common.h: - Reverted my g++ 2.95 updates. They work on all platforms but - the cross-compiler used for VxWorks. + Reverted my g++ 2.95 updates. They work on all platforms but + the cross-compiler used for VxWorks. Wed Sep 27 16:17:36 2000 Ossama Othman <ossama@uci.edu> - * ace/IOStream.h: - * ace/IOStream_T.h: + * ace/IOStream.h: + * ace/IOStream_T.h: - Moved inclusion of `ace/INET_Addr.h' and `ace/Handle_Set.h' to - `IOStream_T.h'. They weren't needed in `IOStream.h'. + Moved inclusion of `ace/INET_Addr.h' and `ace/Handle_Set.h' to + `IOStream_T.h'. They weren't needed in `IOStream.h'. - * ace/config-g++-common.h (ACE_LACKS_AUTO_PTR, - ACE_LACKS_STATIC_DATA_MEMBER_TEMPLATES, - ACE_HAS_GNUC_BROKEN_TEMPLATE_INLINE_FUNCTIONS): + * ace/config-g++-common.h (ACE_LACKS_AUTO_PTR, + ACE_LACKS_STATIC_DATA_MEMBER_TEMPLATES, + ACE_HAS_GNUC_BROKEN_TEMPLATE_INLINE_FUNCTIONS): - G++ 2.95.x properly support the auto_ptr class, templates with - static data members, and inlined template functions. + G++ 2.95.x properly support the auto_ptr class, templates with + static data members, and inlined template functions. Wed Sep 27 14:02:30 2000 Irfan Pyarali <irfan@cs.wustl.edu> - * examples/Reactor/WFMO_Reactor/test_abandoned.cpp - (handle_timeout): Moved <--this->iterations_> outside the DEBUG - statement. + * examples/Reactor/WFMO_Reactor/test_abandoned.cpp + (handle_timeout): Moved <--this->iterations_> outside the DEBUG + statement. Wed Sep 27 08:46:12 2000 Carlos O'Ryan <coryan@uci.edu> diff --git a/ChangeLogs/ChangeLog-03a b/ChangeLogs/ChangeLog-03a index 549d519a4e6..0b20947f8fd 100644 --- a/ChangeLogs/ChangeLog-03a +++ b/ChangeLogs/ChangeLog-03a @@ -1,13 +1,68 @@ +Mon Oct 2 11:29:47 2000 Carlos O'Ryan <coryan@uci.edu> + + * ace/RMCast/RMCast_Retransmission.h: + * ace/RMCast/RMCast_Retransmission.cpp: + Fixed several minor problems in the Retransmission module. + Use Copy_On_Write dispatching for synchronization of the + internal message buffer. + + * ace/RMCast/RMCast_Copy_On_Write.h: + * ace/RMCast/RMCast_Copy_On_Write.i: + * ace/RMCast/RMCast_Copy_On_Write.cpp: + Implement Copy_On_Write semantics for a collection like the + retransmission buffer. This is based on the ESF_Copy_On_Write + strategy developed for the event channel. + We should try to refactor this one in a reusable strategy. + + * ace/RMCast/RMCast_Worker.h: + * ace/RMCast/RMCast_Worker.i: + * ace/RMCast/RMCast_Worker.cpp: + To implement copy-on-write we need the "alternative" form of + iterators. + + * ace/RMCast/RMCast.h: + * ace/RMCast/RMCast_Fragment.h: + * ace/RMCast/RMCast_IO_UDP.h: + * ace/RMCast/RMCast_IO_UDP.cpp: + * ace/RMCast/RMCast_Membership.h: + * ace/RMCast/RMCast_Membership.i: + * ace/RMCast/RMCast_Membership.cpp: + * ace/RMCast/RMCast_Module.h: + * ace/RMCast/RMCast_Module_Factory.h: + * ace/RMCast/RMCast_Partial_Message.h: + * ace/RMCast/RMCast_Proxy.h: + * ace/RMCast/RMCast_Proxy.i: + * ace/RMCast/RMCast_Proxy.cpp: + * ace/RMCast/RMCast_UDP_Event_Handler.h: + * ace/RMCast/RMCast_UDP_Proxy.cpp: + * tests/RMCast/RMCast_Membership_Test.cpp: + Made the comments more doxygen friendly. + Renamed the highest_in_sequence field to next_expected, the new + name really reflects its semantics, this is the next sequence + number that the peer is expecting. All the previous numbers + have been either received or are simply assumed lost. + + * tests/Makefile: + If the rmcast makefile flag is set to 1 we build the RMCast + subdirectory too. + + * tests/RMCast/Makefile: + * tests/RMCast/RMCast_Tests.dsw: + * tests/RMCast/RMCast_Retransmission_Test.dsp: + * tests/RMCast/RMCast_Retransmission_Test.cpp: + New test for the Retransmission module + + Sun Oct 01 15:50:42 2000 Darrell Brunsch <brunsch@uci.edu> * bin/auto_run_tests.pl: Added support for a sandbox program that can shutdown a test - if it hangs. + if it hangs. Fri Sep 29 16:32:22 2000 Darrell Brunsch <brunsch@uci.edu> - * bin/msvc_auto_compile.pl: + * bin/msvc_auto_compile.pl: Added the BE and FE projects for TAO_IDL so the static version gets built correctly in auto_compiles. @@ -24,123 +79,123 @@ Fri Sep 29 16:17:34 2000 Steve Huston <shuston@riverace.com> Thu Sep 28 22:06:44 2000 Ossama Othman <ossama@uci.edu> - * ace/SSL/SSL_SOCK_Connector.h: - * ace/SSL/SSL_SOCK_Connector.cpp (ACE_SSL_SOCK_Connector, - shared_connect_start, shared_connect_finish, connect, complete): + * ace/SSL/SSL_SOCK_Connector.h: + * ace/SSL/SSL_SOCK_Connector.cpp (ACE_SSL_SOCK_Connector, + shared_connect_start, shared_connect_finish, connect, complete): - Made ACE_Time_Value arguments const to match the changes - detailed in: + Made ACE_Time_Value arguments const to match the changes + detailed in: - Thu Sep 28 15:02:36 2000 Irfan Pyarali <irfan@cs.wustl.edu> + Thu Sep 28 15:02:36 2000 Irfan Pyarali <irfan@cs.wustl.edu> Fri Sep 29 00:23:59 2000 Irfan Pyarali <irfan@cs.wustl.edu> - * ace/Log_Msg.h (ACE_RETURN): Fixed minor error. + * ace/Log_Msg.h (ACE_RETURN): Fixed minor error. Thu Sep 28 22:13:04 2000 Irfan Pyarali <irfan@cs.wustl.edu> - * ace/Log_Msg: Macros are evil! All the ACE debugging macros were - evaluating the user arguments twice, once in - log_priority_enabled() and the other in log(). The problem was - that set() was making a deep copy of the filename. Hence, this - change: + * ace/Log_Msg: Macros are evil! All the ACE debugging macros were + evaluating the user arguments twice, once in + log_priority_enabled() and the other in log(). The problem was + that set() was making a deep copy of the filename. Hence, this + change: - Wed May 3 11:43:05 2000 Douglas C. Schmidt <schmidt@ace.cs.wustl.edu> + Wed May 3 11:43:05 2000 Douglas C. Schmidt <schmidt@ace.cs.wustl.edu> - was made to prevent the deep copy which was unnecessary when the - message was not actually logged. However, this resulted in the - macro evaluating the user arguments twice. Previously, a - statement like this: + was made to prevent the deep copy which was unnecessary when the + message was not actually logged. However, this resulted in the + macro evaluating the user arguments twice. Previously, a + statement like this: ACE_DEBUG ((LM_DEBUG, "timeout occured, iterations left %d\n", --iterations)); - <iterations> got reduced by one - now it got reduced by two ;-) + <iterations> got reduced by one - now it got reduced by two ;-) - The solution was to make a shallow copy of the filename in - conditional_set(). Then in log(), if the log priority is - correct, make a deep copy and then continue will log(). The - macros were changed to call conditional_set() instead of set(). + The solution was to make a shallow copy of the filename in + conditional_set(). Then in log(), if the log priority is + correct, make a deep copy and then continue will log(). The + macros were changed to call conditional_set() instead of set(). - Also, changed ACE_RETURN to specify all the parameters, - including <restart>, <callback>, and <stream>. Otherwise, the - default parameters of set() will end up losing these parameters - set by the user. + Also, changed ACE_RETURN to specify all the parameters, + including <restart>, <callback>, and <stream>. Otherwise, the + default parameters of set() will end up losing these parameters + set by the user. Thu Sep 28 15:49:00 2000 Douglas C. Schmidt <schmidt@ace.cs.wustl.edu> - * ace/Stream.h (class ACE_Stream): Updated the documentation to clarify - that the ACE_Time_Value's are *absolute* time. Thanks to Pedro - for reporting this. Thanks to Pedro Brandao - <pbrandao@inescn.pt> for reporting this. + * ace/Stream.h (class ACE_Stream): Updated the documentation to clarify + that the ACE_Time_Value's are *absolute* time. Thanks to Pedro + for reporting this. Thanks to Pedro Brandao + <pbrandao@inescn.pt> for reporting this. Thu Sep 28 15:02:36 2000 Irfan Pyarali <irfan@cs.wustl.edu> - * ace/SOCK_Connector.cpp: ACE_Time_Value args to - ACE_SOCK_Connector could be const. Had to patch functions in - the following files to make it happen: + * ace/SOCK_Connector.cpp: ACE_Time_Value args to + ACE_SOCK_Connector could be const. Had to patch functions in + the following files to make it happen: - ACE.cpp - ACE.h - OS.h - OS.i - SOCK_Connector.cpp - SOCK_Connector.h + ACE.cpp + ACE.h + OS.h + OS.i + SOCK_Connector.cpp + SOCK_Connector.h - Thanks to Steve Huston <shuston@riverace.com> for reporting - this. This also fixes bug 673. + Thanks to Steve Huston <shuston@riverace.com> for reporting + this. This also fixes bug 673. Thu Sep 28 11:14:29 2000 Martin Stack <mstack@cambertx.com> - * ace/config-freebsd.h: - * ace/config-freebsd-pthread.h: - * ace/config-linux-common.h: - * ace/config-irix6.x-common.h: Added/Renamed to - ACE_USES_NEW_TERMIOS_STRUCT. + * ace/config-freebsd.h: + * ace/config-freebsd-pthread.h: + * ace/config-linux-common.h: + * ace/config-irix6.x-common.h: Added/Renamed to + ACE_USES_NEW_TERMIOS_STRUCT. - * ace/TTY_IO.h: - * ace/TTY_IO.cpp: The Win32 section was modified to implement a - non-blocking read when read-timeout=0 is set. Also, it was - modified to ensure proper operations when a read_timeout is - required. + * ace/TTY_IO.h: + * ace/TTY_IO.cpp: The Win32 section was modified to implement a + non-blocking read when read-timeout=0 is set. Also, it was + modified to ensure proper operations when a read_timeout is + required. - Code was added to enable the DTR line on both Win32 and unix - platforms when the port is opened. + Code was added to enable the DTR line on both Win32 and unix + platforms when the port is opened. - Several new flags where added to give proper access to the - serial device. + Several new flags where added to give proper access to the + serial device. - Relabled macro "ACE_USES_OLD_TERMIOS_STRUCT" to - "ACE_USES_NEW_TERMIOS_STRUCT" to properly indicate its purpose. + Relabled macro "ACE_USES_OLD_TERMIOS_STRUCT" to + "ACE_USES_NEW_TERMIOS_STRUCT" to properly indicate its purpose. Thu Sep 28 09:01:19 2000 Ossama Othman <ossama@uci.edu> - * ace/config-g++-common.h: + * ace/config-g++-common.h: - Reverted my g++ 2.95 updates. They work on all platforms but - the cross-compiler used for VxWorks. + Reverted my g++ 2.95 updates. They work on all platforms but + the cross-compiler used for VxWorks. Wed Sep 27 16:17:36 2000 Ossama Othman <ossama@uci.edu> - * ace/IOStream.h: - * ace/IOStream_T.h: + * ace/IOStream.h: + * ace/IOStream_T.h: - Moved inclusion of `ace/INET_Addr.h' and `ace/Handle_Set.h' to - `IOStream_T.h'. They weren't needed in `IOStream.h'. + Moved inclusion of `ace/INET_Addr.h' and `ace/Handle_Set.h' to + `IOStream_T.h'. They weren't needed in `IOStream.h'. - * ace/config-g++-common.h (ACE_LACKS_AUTO_PTR, - ACE_LACKS_STATIC_DATA_MEMBER_TEMPLATES, - ACE_HAS_GNUC_BROKEN_TEMPLATE_INLINE_FUNCTIONS): + * ace/config-g++-common.h (ACE_LACKS_AUTO_PTR, + ACE_LACKS_STATIC_DATA_MEMBER_TEMPLATES, + ACE_HAS_GNUC_BROKEN_TEMPLATE_INLINE_FUNCTIONS): - G++ 2.95.x properly support the auto_ptr class, templates with - static data members, and inlined template functions. + G++ 2.95.x properly support the auto_ptr class, templates with + static data members, and inlined template functions. Wed Sep 27 14:02:30 2000 Irfan Pyarali <irfan@cs.wustl.edu> - * examples/Reactor/WFMO_Reactor/test_abandoned.cpp - (handle_timeout): Moved <--this->iterations_> outside the DEBUG - statement. + * examples/Reactor/WFMO_Reactor/test_abandoned.cpp + (handle_timeout): Moved <--this->iterations_> outside the DEBUG + statement. Wed Sep 27 08:46:12 2000 Carlos O'Ryan <coryan@uci.edu> diff --git a/ace/RMCast/Makefile b/ace/RMCast/Makefile index ae13792c4c4..e97cd885493 100644 --- a/ace/RMCast/Makefile +++ b/ace/RMCast/Makefile @@ -636,17 +636,11 @@ include $(ACE_ROOT)/include/makeinclude/rules.local.GNU $(ACE_ROOT)/ace/Basic_Types.i \ $(ACE_ROOT)/ace/Trace.h \ $(ACE_ROOT)/ace/OS.i \ - RMCast_Export.h RMCast.i RMCast_Module.i \ - $(ACE_ROOT)/ace/RB_Tree.h \ - $(ACE_ROOT)/ace/Functor.h \ + RMCast_Export.h RMCast.i RMCast_Module.i RMCast_Copy_On_Write.h \ + RMCast_Worker.h RMCast_Worker.i RMCast_Worker.cpp \ + $(ACE_ROOT)/ace/Synch.h \ $(ACE_ROOT)/ace/ACE.h \ $(ACE_ROOT)/ace/ACE.i \ - $(ACE_ROOT)/ace/Functor.i \ - $(ACE_ROOT)/ace/Functor_T.h \ - $(ACE_ROOT)/ace/Functor_T.i \ - $(ACE_ROOT)/ace/Functor_T.cpp \ - $(ACE_ROOT)/ace/RB_Tree.i \ - $(ACE_ROOT)/ace/Synch.h \ $(ACE_ROOT)/ace/Synch.i \ $(ACE_ROOT)/ace/Synch_T.h \ $(ACE_ROOT)/ace/Event_Handler.h \ @@ -664,6 +658,14 @@ include $(ACE_ROOT)/include/makeinclude/rules.local.GNU $(ACE_ROOT)/ace/Log_Record.h \ $(ACE_ROOT)/ace/Log_Priority.h \ $(ACE_ROOT)/ace/Log_Record.i \ + RMCast_Copy_On_Write.i RMCast_Copy_On_Write.cpp \ + $(ACE_ROOT)/ace/RB_Tree.h \ + $(ACE_ROOT)/ace/Functor.h \ + $(ACE_ROOT)/ace/Functor.i \ + $(ACE_ROOT)/ace/Functor_T.h \ + $(ACE_ROOT)/ace/Functor_T.i \ + $(ACE_ROOT)/ace/Functor_T.cpp \ + $(ACE_ROOT)/ace/RB_Tree.i \ $(ACE_ROOT)/ace/Malloc.h \ $(ACE_ROOT)/ace/Malloc_Base.h \ $(ACE_ROOT)/ace/Based_Pointer_T.h \ diff --git a/ace/RMCast/RMCast.h b/ace/RMCast/RMCast.h index abf2a24e946..df3a0d48858 100644 --- a/ace/RMCast/RMCast.h +++ b/ace/RMCast/RMCast.h @@ -233,16 +233,19 @@ public: * * This message is used to provide feedback information to senders. * It contains two sequence numbers: - * - highest_in_sequence: is the sequence number of the last message - * received without any lost messages before it - * - highest_received: is the sequence number of the last_message - * successfully received, there may be some messages lost before it + * - \param next_expected: is the sequence number of the next message + * expected, i.e. (next_expected-1) is the last message received + * without any losses before it. + * - \param highest_received: is the highest sequence number among + * all the messages successfully received. + * In other words, all messages lost (if any) are in the range: + * [next_expected,highest_received) * * <CODE> * +---------+----------------------+<BR> * | 8 bits | MT_ACK |<BR> * +---------+----------------------+<BR> - * | 32 bits | highest_in_sequence |<BR> + * | 32 bits | next_expected |<BR> * +---------+----------------------+<BR> * | 32 bits | highest_received |<BR> * +---------+----------------------+<BR> @@ -251,7 +254,7 @@ public: struct Ack { //! The last message received without any losses before it. - ACE_UINT32 highest_in_sequence; + ACE_UINT32 next_expected; //! The last message successfully received ACE_UINT32 highest_received; diff --git a/ace/RMCast/RMCast_Copy_On_Write.cpp b/ace/RMCast/RMCast_Copy_On_Write.cpp new file mode 100644 index 00000000000..f1553c7f4ab --- /dev/null +++ b/ace/RMCast/RMCast_Copy_On_Write.cpp @@ -0,0 +1,176 @@ +// $Id$ + +#ifndef ACE_RMCAST_COPY_ON_WRITE_CPP +#define ACE_RMCAST_COPY_ON_WRITE_CPP + +#include "RMCast_Copy_On_Write.h" + +#if ! defined (__ACE_INLINE__) +#include "RMCast_Copy_On_Write.i" +#endif /* __ACE_INLINE__ */ + +ACE_RCSID(RMCast, RMCast_Copy_On_Write, "$Id$") + +template<class COLLECTION, class ITERATOR> void +ACE_RMCast_Copy_On_Write_Collection<COLLECTION,ITERATOR>::_incr_refcnt (void) +{ + // LOCKING: no locking is required, the caller grabs the mutex. + this->refcount_++; +} + +template<class COLLECTION, class ITERATOR> void +ACE_RMCast_Copy_On_Write_Collection<COLLECTION,ITERATOR>::_decr_refcnt (void) +{ + // LOCKING: no locking is required, the caller grabs the mutex. + { + this->refcount_--; + if (this->refcount_ != 0) + return; + } + //@@ TODO: If this wrapper is going to be completely general some + // kind of functor has to be provided to remove the elements in the + // collection, in case the are no self-managed + + delete this; +} + +// **************************************************************** + +template<class KEY, class ITEM, class COLLECTION, class ITERATOR> +ACE_RMCast_Copy_On_Write<KEY,ITEM,COLLECTION,ITERATOR>:: + ACE_RMCast_Copy_On_Write (void) + : pending_writes_ (0) + , writing_ (0) + , cond_ (mutex_) +{ + ACE_NEW (this->collection_, Collection); +} + +template<class KEY, class ITEM, class COLLECTION, class ITERATOR> +ACE_RMCast_Copy_On_Write<KEY,ITEM,COLLECTION,ITERATOR>:: + ~ACE_RMCast_Copy_On_Write (void) +{ + ACE_GUARD (ACE_SYNCH_MUTEX, ace_mon, this->mutex_); + + while (this->pending_writes_ != 0) + this->cond_.wait (); + + this->collection_->_decr_refcnt (); + this->collection_ = 0; +} + +template<class KEY, class ITEM, class COLLECTION, class ITERATOR> int +ACE_RMCast_Copy_On_Write<KEY,ITEM,COLLECTION,ITERATOR>:: + for_each (ACE_RMCast_Worker<KEY,ITEM> *worker) +{ + Read_Guard ace_mon (this->mutex_, this->collection_); + + ITERATOR end = ace_mon.collection->collection.end (); + for (ITERATOR i = ace_mon.collection->collection.begin (); i != end; ++i) + { + int r = worker->work ((*i).key (), (*i).item ()); + if (r != 0) + return r; + } + return 0; +} + +template<class KEY, class ITEM, class C, class I> int +ACE_RMCast_Copy_On_Write<KEY,ITEM,C,I>::bind (KEY const & k, + ITEM const & i) +{ + Write_Guard ace_mon (this->mutex_, + this->cond_, + this->pending_writes_, + this->writing_, + this->collection_); + + return this->bind_i (ace_mon, k, i); +} + +template<class KEY, class ITEM, class C, class I> int +ACE_RMCast_Copy_On_Write<KEY,ITEM,C,I>::unbind (KEY const & k) +{ + Write_Guard ace_mon (this->mutex_, + this->cond_, + this->pending_writes_, + this->writing_, + this->collection_); + + return this->unbind_i (ace_mon, k); +} + +template<class KEY, class ITEM, class C, class I> int +ACE_RMCast_Copy_On_Write<KEY,ITEM,C,I>::bind_i (Write_Guard &ace_mon, + KEY const & k, + ITEM const & i) +{ + return ace_mon.copy->collection.bind (k, i); +} + +template<class KEY, class ITEM, class C, class I> int +ACE_RMCast_Copy_On_Write<KEY,ITEM,C,I>::unbind_i (Write_Guard &ace_mon, + KEY const & k) +{ + return ace_mon.copy->collection.unbind (k); +} + +// **************************************************************** + +template<class COLLECTION, class ITERATOR> +ACE_RMCast_Copy_On_Write_Write_Guard<COLLECTION,ITERATOR>:: + ACE_RMCast_Copy_On_Write_Write_Guard (ACE_SYNCH_MUTEX &m, + ACE_SYNCH_CONDITION &c, + int &p, + int &w, + Collection*& cr) + : copy (0) + , mutex (m) + , cond (c) + , pending_writes (p) + , writing_flag (w) + , collection (cr) +{ + { + ACE_GUARD (ACE_SYNCH_MUTEX, ace_mon, this->mutex); + + this->pending_writes++; + + while (this->writing_flag != 0) + this->cond.wait (); + + this->writing_flag = 1; + } + + // Copy outside the mutex, because it may take a long time. + // Nobody can change it, because it is protected by the + // writing_flag. + + // First initialize it (with the correct reference count + ACE_NEW (this->copy, Collection); + // Copy the contents + this->copy->collection = this->collection->collection; +} + +template<class COLLECTION, class ITERATOR> +ACE_RMCast_Copy_On_Write_Write_Guard<COLLECTION,ITERATOR>:: + ~ACE_RMCast_Copy_On_Write_Write_Guard (void) +{ + Collection *tmp = 0; + { + ACE_GUARD (ACE_SYNCH_MUTEX, ace_mon, this->mutex); + + tmp = this->collection; + this->collection = this->copy; + this->writing_flag = 0; + this->pending_writes--; + + this->cond.signal (); + } + // Delete outside the mutex, because it may take a long time. + tmp->_decr_refcnt (); +} + +// **************************************************************** + +#endif /* ACE_RMCAST_COPY_ON_WRITE_CPP */ diff --git a/ace/RMCast/RMCast_Copy_On_Write.h b/ace/RMCast/RMCast_Copy_On_Write.h new file mode 100644 index 00000000000..8724e23a5d5 --- /dev/null +++ b/ace/RMCast/RMCast_Copy_On_Write.h @@ -0,0 +1,173 @@ +/* -*- C++ -*- */ +// $Id$ +// + +#ifndef ACE_RMCAST_COPY_ON_WRITE_H +#define ACE_RMCAST_COPY_ON_WRITE_H +#include "ace/pre.h" + +#include "RMCast_Worker.h" +#include "ace/Synch.h" + +//! A wrapper to implement reference counted collections +template<class COLLECTION, class ITERATOR> +class ACE_RMCast_Copy_On_Write_Collection +{ +public: + //! Constructor + ACE_RMCast_Copy_On_Write_Collection (void); + + //! Increment the reference count + void _incr_refcnt (void); + + //! Decrement the reference count + void _decr_refcnt (void); + + //! The actual collection + COLLECTION collection; + +private: + //! The reference count + ACE_UINT32 refcount_; +}; + +// **************************************************************** + +//! Implement a read guard for a reference counted collection +template<class COLLECTION, class ITERATOR> +class ACE_RMCast_Copy_On_Write_Read_Guard +{ +public: + typedef ACE_RMCast_Copy_On_Write_Collection<COLLECTION,ITERATOR> Collection; + + //! Constructor + ACE_RMCast_Copy_On_Write_Read_Guard (ACE_SYNCH_MUTEX &mutex, + Collection *&collection); + + //! Destructor + ~ACE_RMCast_Copy_On_Write_Read_Guard (void); + + //! A reference to the collection + Collection *collection; + +private: + //! Synchronization + ACE_SYNCH_MUTEX &mutex_; +}; + +// **************************************************************** + +//! Implement the write guard for a reference counted collecion +/*! + * This helper class atomically increments the reference count of a + * ACE_RMCast_Copy_On_Write_Collection and reads the current + * collection in the Copy_On_Write class. + */ +template<class COLLECTION, class ITERATOR> +class ACE_RMCast_Copy_On_Write_Write_Guard +{ +public: + typedef ACE_RMCast_Copy_On_Write_Collection<COLLECTION,ITERATOR> Collection; + + //! Constructor + ACE_RMCast_Copy_On_Write_Write_Guard (ACE_SYNCH_MUTEX &mutex, + ACE_SYNCH_CONDITION &cond, + int &pending_writes, + int &writing_flag, + Collection*& collection); + + //! Destructor + ~ACE_RMCast_Copy_On_Write_Write_Guard (void); + + //! The collection + Collection *copy; + +private: + //! Keep a reference to the mutex + ACE_SYNCH_MUTEX &mutex; + + //! Keep a reference to the condition variable + ACE_SYNCH_CONDITION &cond; + + //! Use a reference to update the pending writes count + int &pending_writes; + + //! Use a reference to update the writing flag + int &writing_flag; + + //! Use this reference to update the collection once the + //! modifications are finished. + Collection *&collection; +}; + +// **************************************************************** + +//! Implement a copy on write wrapper for a map-like collection +template<class KEY, class ITEM, class COLLECTION, class ITERATOR> +class ACE_RMCast_Copy_On_Write +{ +public: + //! The Read_Guard trait + typedef ACE_RMCast_Copy_On_Write_Read_Guard<COLLECTION,ITERATOR> Read_Guard; + + //! The Write_Guard trait + typedef ACE_RMCast_Copy_On_Write_Write_Guard<COLLECTION,ITERATOR> Write_Guard; + + //! The underlying collection type + typedef ACE_RMCast_Copy_On_Write_Collection<COLLECTION,ITERATOR> Collection; + + //! Constructor + ACE_RMCast_Copy_On_Write (void); + + //! Destructor + ~ACE_RMCast_Copy_On_Write (void); + + //! Iterate over all the elements invoking \param worker on each one. + int for_each (ACE_RMCast_Worker<KEY,ITEM> *worker); + + //! Add a new element + int bind (KEY const & key, ITEM const & item); + + //! Remove an element + int unbind (KEY const & key); + + //! Bind assuming the Write_Guard is held + int bind_i (Write_Guard &guard, KEY const & key, ITEM const & item); + + //! Unbind assuming the Write_Guard is held + int unbind_i (Write_Guard &guard, KEY const & key); + + //! Number of pending writes + int pending_writes_; + + //! If non-zero then a thread is changing the collection. + /*! + * Many threads can use the collection simulatenously, but only one + * change it. + */ + int writing_; + + //! A mutex to serialize access to the collection pointer. + ACE_SYNCH_MUTEX mutex_; + + //! A condition variable to wait to synchronize multiple writers. + ACE_SYNCH_CONDITION cond_; + + //! The collection, with reference counting added + Collection *collection_; +}; + +#if defined (__ACE_INLINE__) +#include "RMCast_Copy_On_Write.i" +#endif /* __ACE_INLINE__ */ + +#if defined (ACE_TEMPLATES_REQUIRE_SOURCE) +#include "RMCast_Copy_On_Write.cpp" +#endif /* ACE_TEMPLATES_REQUIRE_SOURCE */ + +#if defined (ACE_TEMPLATES_REQUIRE_PRAGMA) +#pragma implementation ("RMCast_Copy_On_Write.cpp") +#endif /* ACE_TEMPLATES_REQUIRE_PRAGMA */ + +#include "ace/post.h" +#endif /* ACE_RMCAST_COPY_ON_WRITE_H */ diff --git a/ace/RMCast/RMCast_Copy_On_Write.i b/ace/RMCast/RMCast_Copy_On_Write.i new file mode 100644 index 00000000000..c6e5099cda5 --- /dev/null +++ b/ace/RMCast/RMCast_Copy_On_Write.i @@ -0,0 +1,36 @@ +// $Id$ + +template<class COLLECTION, class ITERATOR> ACE_INLINE +ACE_RMCast_Copy_On_Write_Collection<COLLECTION,ITERATOR>:: + ACE_RMCast_Copy_On_Write_Collection (void) + : refcount_ (1) +{ +} + +// **************************************************************** + +template<class COLLECTION, class ITERATOR> ACE_INLINE +ACE_RMCast_Copy_On_Write_Read_Guard<COLLECTION,ITERATOR>:: + ACE_RMCast_Copy_On_Write_Read_Guard (ACE_SYNCH_MUTEX &m, + Collection*& collection_ref) + : collection (0) + , mutex_ (m) +{ + ACE_GUARD (ACE_SYNCH_MUTEX, ace_mon, this->mutex_); + this->collection = collection_ref; + this->collection->_incr_refcnt (); +} + +template<class COLLECTION, class ITERATOR> ACE_INLINE +ACE_RMCast_Copy_On_Write_Read_Guard<COLLECTION,ITERATOR>:: + ~ACE_RMCast_Copy_On_Write_Read_Guard (void) +{ + if (this->collection != 0) + { + ACE_GUARD (ACE_SYNCH_MUTEX, ace_mon, this->mutex_); + this->collection->_decr_refcnt (); + } +} + +// **************************************************************** + diff --git a/ace/RMCast/RMCast_Fragment.h b/ace/RMCast/RMCast_Fragment.h index 7b64d763ebc..eed08c92517 100644 --- a/ace/RMCast/RMCast_Fragment.h +++ b/ace/RMCast/RMCast_Fragment.h @@ -1,15 +1,5 @@ // $Id$ -// ============================================================================ -// -// = DESCRIPTION -// The fragmentation task for the reliable multicast library -// -// = AUTHOR -// Carlos O'Ryan <coryan@uci.edu> -// -// ============================================================================ - #ifndef ACE_RMCAST_FRAGMENT_H #define ACE_RMCAST_FRAGMENT_H #include "ace/pre.h" @@ -21,28 +11,46 @@ # pragma once #endif /* ACE_LACKS_PRAGMA_ONCE */ +//! Default fragment size #ifndef ACE_RMCAST_DEFAULT_FRAGMENT_SIZE # define ACE_RMCAST_DEFAULT_FRAGMENT_SIZE 1024 #endif /* ACE_RMCAST_DEFAULT_FRAGMENT_SIZE */ +//! Fragmentation module +/*! + * Some transports cannot send very big messages, for example UDP + * imposes a limit of 64K, and in practice the limit is even more + * strict than that. + * This class decomposes a message into multiple fragments, using an + * application defined maximum size. + */ class ACE_RMCast_Export ACE_RMCast_Fragment : public ACE_RMCast_Module { public: + //! Constructor ACE_RMCast_Fragment (void); - // Constructor + //! Destructor virtual ~ACE_RMCast_Fragment (void); - // Destructor + //! Accessor for the max_fragment size. + /*! There is no modifier, the maximum fragment size is obtained + * using feedback from the lower layers (transport?) + * @@TODO We have not implemented the feedback mechanisms yet! + */ size_t max_fragment_size (void) const; - // Accessor for the max_fragment size. - // There is no modifier, the maximum fragment size is obtained using - // feedback from the lower layer (transport?) - // = The ACE_RMCast_Module methods + /*! + * Only data messages need fragmentation, the control messages are + * all small enough for all the transports that I know about. + * Well, actually for CAN-Bus (Controller Area Network), they may be + * too big, because the max payload there is 8 bytes, but we don't + * play with those in ACE. + */ virtual int data (ACE_RMCast::Data &data); private: + //! Current fragment size limit size_t max_fragment_size_; }; diff --git a/ace/RMCast/RMCast_IO_UDP.cpp b/ace/RMCast/RMCast_IO_UDP.cpp index af655f3130f..421982d5ad6 100644 --- a/ace/RMCast/RMCast_IO_UDP.cpp +++ b/ace/RMCast/RMCast_IO_UDP.cpp @@ -354,7 +354,7 @@ ACE_RMCast_IO_UDP::send_ack (ACE_RMCast::Ack &ack, char header[16]; header[0] = ACE_RMCast::MT_ACK; - ACE_UINT32 tmp = ACE_HTONL (ack.highest_in_sequence); + ACE_UINT32 tmp = ACE_HTONL (ack.next_expected); ACE_OS::memcpy (header + 1, &tmp, sizeof(ACE_UINT32)); tmp = ACE_HTONL (ack.highest_received); diff --git a/ace/RMCast/RMCast_IO_UDP.h b/ace/RMCast/RMCast_IO_UDP.h index bdcccabe6e1..5af403bf994 100644 --- a/ace/RMCast/RMCast_IO_UDP.h +++ b/ace/RMCast/RMCast_IO_UDP.h @@ -33,44 +33,65 @@ class ACE_Time_Value; class ACE_RMCast_Export ACE_RMCast_IO_UDP : public ACE_RMCast_Module { public: + //! Constructor + /*! + * The <factory> argument is used to create the modules for each + * proxy that process incoming messages. The class does *not* assume + * ownership of <factory>, the caller owns it. But it does assume + * ownership of the modules returned by the factory, and it may ask + * the factory to release them eventually. + */ ACE_RMCast_IO_UDP (ACE_RMCast_Module_Factory *factory); - // Constructor - // <factory> is used to create the modules for each proxy that - // process incoming messages. The class does *not* assume ownership - // of <factory>, the caller owns it. + //! Destructor ~ACE_RMCast_IO_UDP (void); - // Destructor + //! Join a new multicast group + /*! + * Start receiving data for the <mcast_addr> multicast group. + * Please read the documentation of ACE_SOCK_Dgram_Mcast for more + * details. + */ int subscribe (const ACE_INET_Addr &mcast_addr, int reuse_addr = 1, const ACE_TCHAR *net_if = 0, int protocol_family = PF_INET, int protocol = 0); - // Start receiving data for the <mcast_addr> multicast group. - // Please read the documentation of <ACE_SOCK_Dgram_Mcast> for more - // details. // The class can be used with a Reactor or using blocking I/O // depending on what method of the following two is called. + //! Wait for events for the period <tv>. If <tv> is zero it blocks + //! forever. int handle_events (ACE_Time_Value *tv = 0); - // Wait for events for the period <tv>. If <tv> is zero it blocks - // forever. + //! Register any event handlers into <reactor> + /*! + * @@TODO: This should be left for the clients of the class, there + * is no reason why this class must know about reactors. + */ int register_handlers (ACE_Reactor *reactor); - // Register any event handlers into <reactor> + //! Remove all the handlers from the reactor + /*! + * @@TODO: This should be left for the clients of the class, there + * is no reason why this class must know about reactors. + */ int remove_handlers (void); - // Remove all the handlers from the reactor + //! There is data to read, read it and process it. int handle_input (ACE_HANDLE h); - // There is data to read, read it and process it. + //! Obtain the handle for the underlying socket ACE_HANDLE get_handle (void) const; - // Obtain the handle for the underlying socket - // Send back to the remove object represented by <proxy> + //@{ + //! Send the message to the ACE_INET_Addr argument. + /*! + * These methods are used in the implementation of the + * ACE_RMCast_UDP_Proxy objects and the implementation of the + * inherited ACE_RMCast_Module methods in this class. + */ int send_data (ACE_RMCast::Data &, const ACE_INET_Addr &); int send_poll (ACE_RMCast::Poll &, const ACE_INET_Addr &); int send_ack_join (ACE_RMCast::Ack_Join &, const ACE_INET_Addr &); @@ -78,8 +99,9 @@ public: int send_ack (ACE_RMCast::Ack &, const ACE_INET_Addr &); int send_join (ACE_RMCast::Join &, const ACE_INET_Addr &); int send_leave (ACE_RMCast::Leave &, const ACE_INET_Addr &); + //@} - // = The RMCast_Module methods + // Please read the documentation in ACE_RMCast_Module for more details virtual int data (ACE_RMCast::Data &); virtual int poll (ACE_RMCast::Poll &); virtual int ack_join (ACE_RMCast::Ack_Join &); @@ -87,23 +109,24 @@ public: virtual int ack (ACE_RMCast::Ack &); virtual int join (ACE_RMCast::Join &); virtual int leave (ACE_RMCast::Leave &); - // The messages are sent to the multicast group private: + //! The factory used to create the modules attached to each proxy ACE_RMCast_Module_Factory *factory_; - // The factory used to create the modules attached to each proxy + //! The multicast group we subscribe and send to ACE_INET_Addr mcast_group_; - // The multicast group we subscribe and send to + //! The socket used to receive and send data ACE_SOCK_Dgram_Mcast dgram_; - // The socket + //! Use a Hash_Map to maintain the collection of proxies typedef ACE_Hash_Map_Manager<ACE_INET_Addr,ACE_RMCast_UDP_Proxy*,ACE_Null_Mutex> Map; + //! The collection of proxies Map map_; + //! The event handler adapter ACE_RMCast_UDP_Event_Handler eh_; - // The event handler adapter }; #if defined (__ACE_INLINE__) diff --git a/ace/RMCast/RMCast_Membership.cpp b/ace/RMCast/RMCast_Membership.cpp index 6ee2690a41f..a23d7a756e5 100644 --- a/ace/RMCast/RMCast_Membership.cpp +++ b/ace/RMCast/RMCast_Membership.cpp @@ -28,14 +28,14 @@ ACE_RMCast_Membership::ack (ACE_RMCast::Ack &ack) ACE_RMCast::Ack next_ack; { ACE_GUARD_RETURN (ACE_SYNCH_MUTEX, ace_mon, this->mutex_, -1); - if (ack.highest_in_sequence < this->highest_in_sequence_) + if (ack.next_expected < this->next_expected_) { // @@ This violates an invariant of the class, shouldn't // happen... // ACE_DEBUG ((LM_DEBUG, "ACE_RMCast_Membership::ack[3]\n")); return -1; } - else if (ack.highest_in_sequence == this->highest_in_sequence_) + else if (ack.next_expected == this->next_expected_) { // Nothing new, just continue.... // ACE_DEBUG ((LM_DEBUG, "ACE_RMCast_Membership::ack[4]\n")); @@ -43,21 +43,23 @@ ACE_RMCast_Membership::ack (ACE_RMCast::Ack &ack) } // Possible update, re-evaluate the story... - ACE_UINT32 highest_in_sequence = (*i)->highest_in_sequence (); + ACE_UINT32 next_expected = (*i)->next_expected (); ACE_UINT32 highest_received = (*i)->highest_received (); ++i; for (; i != end; ++i) { - ACE_UINT32 s = (*i)->highest_in_sequence (); - if (s < highest_in_sequence) - highest_in_sequence = s; + ACE_UINT32 s = (*i)->next_expected (); + if (s < next_expected) + next_expected = s; ACE_UINT32 r = (*i)->highest_received (); if (r > highest_received) highest_received = r; } #if 0 - if (this->highest_in_sequence_ >= highest_in_sequence + // @@TODO: this is an important feature, disabled until it is + // fully debugged + if (this->next_expected_ >= next_expected || this->highest_received_ >= highest_received) { // No change.... @@ -65,12 +67,12 @@ ACE_RMCast_Membership::ack (ACE_RMCast::Ack &ack) return 0; } #endif /* 0 */ - this->highest_in_sequence_ = highest_in_sequence; + this->next_expected_ = next_expected; this->highest_received_ = highest_received; if (this->next () == 0) return 0; next_ack.source = ack.source; - next_ack.highest_in_sequence = this->highest_in_sequence_; + next_ack.next_expected = this->next_expected_; next_ack.highest_received = this->highest_received_; } // @@ This looks like a race condition, next() is checked inside the @@ -89,6 +91,8 @@ ACE_RMCast_Membership::join (ACE_RMCast::Join &join) ACE_GUARD_RETURN (ACE_SYNCH_MUTEX, ace_mon, this->mutex_, -1); if (this->proxies_.insert (join.source) == -1) return -1; + // @@TODO: This may change the next Ack to send up, should + // recompute and send the right message if that was the case. } return this->ACE_RMCast_Module::join (join); @@ -103,6 +107,8 @@ ACE_RMCast_Membership::leave (ACE_RMCast::Leave &leave) { ACE_GUARD_RETURN (ACE_SYNCH_MUTEX, ace_mon, this->mutex_, -1); (void) this->proxies_.remove (leave.source); + // @@TODO: This may change the next Ack to send up, should + // recompute and send the right message if that was the case. } return this->ACE_RMCast_Module::leave (leave); diff --git a/ace/RMCast/RMCast_Membership.h b/ace/RMCast/RMCast_Membership.h index a99a7752507..21ee1bea97b 100644 --- a/ace/RMCast/RMCast_Membership.h +++ b/ace/RMCast/RMCast_Membership.h @@ -28,41 +28,59 @@ class ACE_RMCast_Proxy; +//! Track peer membership +/*! + * Reliable senders of events need to know exactly how many peers are + * receiving the events, and how many events has each peer received so + * far. + * This class uses the Join, Leave and Ack messages to build that + * information, it also summarizes the Ack events and propagate only + * the global info to the upper layer. + */ class ACE_RMCast_Export ACE_RMCast_Membership : public ACE_RMCast_Module { - // = TITLE - // Track Receiver membership - // - // = DESCRIPTION - // Define the interface for all reliable multicast membership public: - // = Initialization and termination methods. + //! Constructor ACE_RMCast_Membership (void); - // Constructor + //! Destructor virtual ~ACE_RMCast_Membership (void); - // Destructor - // = The RMCast_Module methods + //! Receive an process an Ack message + /*! + * After receiving the Ack message we find out what is the lowest + * sequence number received in order among all the acks received by + * the proxies in the collection. We also find out what is the + * highest sequence number received by any proxy. + * We only propagate that information back to the upper layer, and + * then only if there are any news since the last Ack. + */ virtual int ack (ACE_RMCast::Ack &); + + //! Add a new member to the collection, using the <source> field in + //! the Join message virtual int join (ACE_RMCast::Join &); + + //! Remove a member from the collection, using the <source> field in + //! the Join message virtual int leave (ACE_RMCast::Leave &); protected: + //! Use an unbounded set to maintain the collection of proxies. typedef ACE_Unbounded_Set<ACE_RMCast_Proxy*> Proxy_Collection; typedef ACE_Unbounded_Set_Iterator<ACE_RMCast_Proxy*> Proxy_Iterator; + //! The collection of proxies Proxy_Collection proxies_; - // The membership buffer - ACE_UINT32 highest_in_sequence_; - // The smallest value of <highest_in_sequence> for all the proxies + //! The smallest value of \param next_expected for all the proxies + ACE_UINT32 next_expected_; + //! The highest value of \param highest_received for all the proxies ACE_UINT32 highest_received_; - // The highest value of <highest_received> for all the proxies + //! Synchronization ACE_SYNCH_MUTEX mutex_; - // Synchronization }; #if defined (__ACE_INLINE__) diff --git a/ace/RMCast/RMCast_Membership.i b/ace/RMCast/RMCast_Membership.i index 0c3e33c2d01..b513c2d5141 100644 --- a/ace/RMCast/RMCast_Membership.i +++ b/ace/RMCast/RMCast_Membership.i @@ -2,7 +2,7 @@ ACE_INLINE ACE_RMCast_Membership::ACE_RMCast_Membership (void) - : highest_in_sequence_ (0) + : next_expected_ (0) , highest_received_ (0) { } diff --git a/ace/RMCast/RMCast_Module.h b/ace/RMCast/RMCast_Module.h index dc4077fa4ab..fad76caac53 100644 --- a/ace/RMCast/RMCast_Module.h +++ b/ace/RMCast/RMCast_Module.h @@ -36,55 +36,55 @@ class ACE_Time_Value; class ACE_RMCast_Export ACE_RMCast_Module { public: - // = Initialization and termination methods. + //! Constructor ACE_RMCast_Module (void); - //!< Constructor + //! Destructor virtual ~ACE_RMCast_Module (void); - //!< Destructor + //! Modifier for the next element in the stack virtual int next (ACE_RMCast_Module *next); - //!< Modifier for the next element in the stack + //! Accesor for the next element in the stack virtual ACE_RMCast_Module* next (void) const; - //!< Accesor for the next element in the stack + //! Modifier for the previous element in the stack virtual int prev (ACE_RMCast_Module *prev); - //!< Modifier for the previous element in the stack + //! Accesor for the previous element in the stack virtual ACE_RMCast_Module* prev (void) const; - //!< Accesor for the previous element in the stack + //! Initialize the module, setting up the next module virtual int open (void); - //!< Initialize the module, setting up the next module + //! Close the module. virtual int close (void); - //!< Close the module. + //! Push data through the stack virtual int data (ACE_RMCast::Data &); - //!< Push data through the stack + //! Push a polling request through the stack virtual int poll (ACE_RMCast::Poll &); - //!< Push a polling request through the stack + //! Push a message to ack a join request through the stack virtual int ack_join (ACE_RMCast::Ack_Join &); - //!< Push a message to ack a join request through the stack + //! Push a message to ack a leave request through the stack virtual int ack_leave (ACE_RMCast::Ack_Leave &); - //!< Push a message to ack a leave request through the stack + //! Push an ack mesage through the stack virtual int ack (ACE_RMCast::Ack &); - //!< Push an ack mesage through the stack + //! Push a join message through the stack virtual int join (ACE_RMCast::Join &); - //!< Push a join message through the stack + //! Push a leave message through the stack virtual int leave (ACE_RMCast::Leave &); - //!< Push a leave message through the stack private: //! The next element in the stack ACE_RMCast_Module *next_; + //! The previous element in the stack ACE_RMCast_Module *prev_; }; diff --git a/ace/RMCast/RMCast_Module_Factory.h b/ace/RMCast/RMCast_Module_Factory.h index 722ad87d678..f0ea58df0e5 100644 --- a/ace/RMCast/RMCast_Module_Factory.h +++ b/ace/RMCast/RMCast_Module_Factory.h @@ -27,19 +27,40 @@ class ACE_RMCast_Module; class ACE_RMCast_IO_UDP; +//! Create Module stacks +/*! + * Different application will probably require different + * configurations in their Module stack, some will just want best + * effort semantics. Others will use Reliable communication with a + * maximum retransmission time. Furthermore, applications may want to + * receive messages in send order, or just as soon as they are + * received. + * Obviously most applications will want to change want happens once a + * message is completely received. + * + * To achieve all this flexibility the IO layer uses this factory to + * create the full stack of Modules corresponding to a single + * consumer. + * To keep the complexity under control the intention is to create + * helper Factories, such as Reliable_Module_Factory where + * applications only need to customize a few features. + */ class ACE_RMCast_Export ACE_RMCast_Module_Factory { - // = DESCRIPTION - // public: + //! Destructor virtual ~ACE_RMCast_Module_Factory (void); - // Destructor + //! Create a new proxy virtual ACE_RMCast_Module *create (ACE_RMCast_IO_UDP *) = 0; - // Create a new proxy + //! Destroy a proxy + /*! + * Some factories may allocate modules from a pool, or return the + * same module for all proxies. Consequently, only the factory + * knows how to destroy them. + */ virtual void destroy (ACE_RMCast_Module *) = 0; - // Destroy a proxy }; #if defined (__ACE_INLINE__) diff --git a/ace/RMCast/RMCast_Partial_Message.h b/ace/RMCast/RMCast_Partial_Message.h index 9b71eb4a541..88fa9ab2f1a 100644 --- a/ace/RMCast/RMCast_Partial_Message.h +++ b/ace/RMCast/RMCast_Partial_Message.h @@ -26,44 +26,72 @@ #define ACE_RMCAST_DEFAULT_HOLE_COUNT 16 #endif /* ACE_RMCAST_DEFAULT_HOLE_COUNT */ +//! Represent a partially received message in the +//! ACE_RMCast_Reassembly module +/*! + * This class provides temporary storage for the fragments as they are + * received in the ACE_RMCast_Reassembly module. It also keeps track + * of what portions of the message are still missing. + */ class ACE_RMCast_Export ACE_RMCast_Partial_Message { public: + //! Constructor, reserve enough memory for the complete message ACE_RMCast_Partial_Message (ACE_UINT32 message_size); + + //! Destructor ~ACE_RMCast_Partial_Message (void); + //! Process a fragment + /*! + * A fragment starting at <offset> has been received, copy the + * fragment contents and update the list of holes. + */ int fragment_received (ACE_UINT32 message_size, ACE_UINT32 offset, ACE_Message_Block *mb); + + //! Return 1 if the message is complete int is_complete (void) const; + //! Return the body of the message, the memory is *not* owned by the + //! caller ACE_Message_Block *message_body (void); - // Return the body of the message, the memory is owned by the - // class. private: + //! Insert a new hole into the list + /*! + * The class keeps an array to represent the missing portions of the + * message. This method inserts a new hole, i.e. a new element in + * the array at index <i>. The <start> and <end> arguments represent + * the offsets of the missing portion of the message. + */ int insert_hole (size_t i, ACE_UINT32 start, ACE_UINT32 end); - // Insert a new hole into the list + //! Remove a hole from the list int remove_hole (size_t i); - // Remove a hole from the list private: + //! Maintain the message storage ACE_Message_Block message_body_; - // Used to rebuild the body of the message + //! Represent a missing portion of a message struct Hole { + //! Offset where the missing portion of the message starts ACE_UINT32 start; + //! Offset where the missing portion of the message ends ACE_UINT32 end; }; + //! Implement a growing array of Hole structures + //@{ Hole *hole_list_; size_t max_hole_count_; size_t hole_count_; - // The current list of holes in the message_body. + //@} }; #if defined (__ACE_INLINE__) diff --git a/ace/RMCast/RMCast_Proxy.cpp b/ace/RMCast/RMCast_Proxy.cpp index 53d9d0b6726..f6b2bbec5e5 100644 --- a/ace/RMCast/RMCast_Proxy.cpp +++ b/ace/RMCast/RMCast_Proxy.cpp @@ -15,9 +15,9 @@ ACE_RMCast_Proxy::~ACE_RMCast_Proxy (void) } ACE_UINT32 -ACE_RMCast_Proxy::highest_in_sequence (void) const +ACE_RMCast_Proxy::next_expected (void) const { - return this->highest_in_sequence_; + return this->next_expected_; } ACE_UINT32 @@ -29,7 +29,7 @@ ACE_RMCast_Proxy::highest_received (void) const int ACE_RMCast_Proxy::ack (ACE_RMCast::Ack &ack) { - this->highest_in_sequence_ = ack.highest_in_sequence; + this->next_expected_ = ack.next_expected; this->highest_received_ = ack.highest_received; return this->ACE_RMCast_Module::ack (ack); } diff --git a/ace/RMCast/RMCast_Proxy.h b/ace/RMCast/RMCast_Proxy.h index 414b74174fb..e0e6afe79b1 100644 --- a/ace/RMCast/RMCast_Proxy.h +++ b/ace/RMCast/RMCast_Proxy.h @@ -48,27 +48,28 @@ public: //! Destructor virtual ~ACE_RMCast_Proxy (void); - - //! Return the highest sequence number received without any losses - //! before it. Only applies to remote receiver proxies. + + //! Return the next sequence number expected by the peer. Only + //! applies to remote receiver proxies. /*! - Please read the documentation in ACE_RMCast::Ack + * Please read the documentation in ACE_RMCast::Ack */ - virtual ACE_UINT32 highest_in_sequence (void) const; + virtual ACE_UINT32 next_expected (void) const; //! Return the highest sequence number successfully received. //! Only applies to remote receiver proxies. /*! - Please read the documentation in ACE_RMCast::Ack + * Please read the documentation in ACE_RMCast::Ack */ virtual ACE_UINT32 highest_received (void) const; //@{ //! Send messages directly to the peer. - /*! Send a message directly to the peer, i.e. the message is not - sent through the multicast group and it may not be processed by - all the layers in the stack. - */ + /*! + * Send a message directly to the peer, i.e. the message is not + * sent through the multicast group and it may not be processed by + * all the layers in the stack. + */ virtual int reply_data (ACE_RMCast::Data &) = 0; virtual int reply_poll (ACE_RMCast::Poll &) = 0; virtual int reply_ack_join (ACE_RMCast::Ack_Join &) = 0; @@ -79,8 +80,8 @@ public: //@} /*! - Proxies process the ACK sequence numbers to save the sequence - numbers reported from the remote peer. + * Proxies process the ACK sequence numbers to cache the ack + * information from the peer. */ virtual int ack (ACE_RMCast::Ack &); @@ -88,7 +89,7 @@ private: //@{ //! Cache the sequence numbers reported from the remote peer using //! Ack messages - ACE_UINT32 highest_in_sequence_; + ACE_UINT32 next_expected_; ACE_UINT32 highest_received_; //@} }; diff --git a/ace/RMCast/RMCast_Proxy.i b/ace/RMCast/RMCast_Proxy.i index f93feaa5639..6fee09fe9e5 100644 --- a/ace/RMCast/RMCast_Proxy.i +++ b/ace/RMCast/RMCast_Proxy.i @@ -2,7 +2,7 @@ ACE_INLINE ACE_RMCast_Proxy::ACE_RMCast_Proxy (void) - : highest_in_sequence_ (0) + : next_expected_ (0) , highest_received_ (0) { } diff --git a/ace/RMCast/RMCast_Retransmission.cpp b/ace/RMCast/RMCast_Retransmission.cpp index a996e1204d5..7e38cdf7c97 100644 --- a/ace/RMCast/RMCast_Retransmission.cpp +++ b/ace/RMCast/RMCast_Retransmission.cpp @@ -16,18 +16,60 @@ ACE_RMCast_Retransmission::~ACE_RMCast_Retransmission (void) { } +class ACE_RMCast_Resend_Worker + : public ACE_RMCast_Worker<ACE_UINT32,ACE_RMCast::Data> +{ +public: + ACE_RMCast_Resend_Worker (ACE_RMCast_Module *next, + ACE_UINT32 max_sequence_number) + : n (0) + , next_ (next) + , max_sequence_number_ (max_sequence_number) + { + } + + int work (ACE_UINT32 const & key, + ACE_RMCast::Data const & item) + { + if (key > this->max_sequence_number_) + return 0; + ACE_DEBUG ((LM_DEBUG, + " Retransmission::resend - message %d resent\n", + key)); + ACE_RMCast::Data data = item; + int r = this->next_->data (data); + if (r != 0) + return r; + n++; + return 0; + } + + int n; + +private: + ACE_RMCast_Module *next_; + + ACE_UINT32 max_sequence_number_; +}; + int -ACE_RMCast_Retransmission::close (void) +ACE_RMCast_Retransmission::resend (ACE_UINT32 max_sequence_number) { - Messages_Iterator end = this->messages_.end (); + if (this->next () == 0) + return 0; - for (Messages_Iterator i = this->messages_.begin (); - i != end; - ++i) - { - ACE_Message_Block::release ((*i).item ().payload); - } - this->messages_.close (); + ACE_RMCast_Resend_Worker worker (this->next (), max_sequence_number); + + if (this->messages_.for_each (&worker) == -1) + return -1; + + return worker.n; +} + +int +ACE_RMCast_Retransmission::close (void) +{ + // @@ return 0; } @@ -40,7 +82,6 @@ ACE_RMCast_Retransmission::data (ACE_RMCast::Data &data) int r = this->next ()->data (data); if (r == 0) { - ACE_GUARD_RETURN (ACE_SYNCH_MUTEX, ace_mon, this->mutex_, -1); ACE_RMCast::Data copy = data; copy.payload = ACE_Message_Block::duplicate (data.payload); r = this->messages_.bind (data.sequence_number, copy); @@ -55,8 +96,9 @@ ACE_RMCast_Retransmission::join (ACE_RMCast::Join &join) return 0; ACE_RMCast::Ack_Join ack_join; +#if 0 + // @@ { - ACE_GUARD_RETURN (ACE_SYNCH_MUTEX, ace_mon, this->mutex_, -1); Messages_Iterator end = this->messages_.end (); Messages_Iterator begin = this->messages_.begin (); @@ -70,6 +112,7 @@ ACE_RMCast_Retransmission::join (ACE_RMCast::Join &join) ack_join.next_sequence_number = (*begin).key (); } } +#endif (void) join.source->reply_ack_join (ack_join); // @@ We should force a full retransmission of all the messages! @@ -77,20 +120,54 @@ ACE_RMCast_Retransmission::join (ACE_RMCast::Join &join) return 0; } +class ACE_RMCast_Ack_Worker + : public ACE_RMCast_Worker<ACE_UINT32,ACE_RMCast::Data> +{ +public: + ACE_RMCast_Ack_Worker (ACE_RMCast::Ack &ack, + ACE_RMCast_Retransmission::Messages::Write_Guard &g, + ACE_RMCast_Retransmission::Messages *messages) + : ack_ (ack) + , ace_mon_ (g) + , messages_ (messages) + { + } + + int work (ACE_UINT32 const & key, + ACE_RMCast::Data const &) + { + if (key >= this->ack_.next_expected) + return 0; + ACE_DEBUG ((LM_DEBUG, + " Retransmission::ack - message %d erased\n", + key)); + return this->messages_->unbind_i (this->ace_mon_, key); + } + +private: + ACE_RMCast_Ack_Worker (const ACE_RMCast_Ack_Worker&); + ACE_RMCast_Ack_Worker& operator= (const ACE_RMCast_Ack_Worker&); + +private: + ACE_RMCast::Ack &ack_; + + ACE_RMCast_Retransmission::Messages::Write_Guard &ace_mon_; + + ACE_RMCast_Retransmission::Messages *messages_; +}; + int ACE_RMCast_Retransmission::ack (ACE_RMCast::Ack &ack) { - ACE_GUARD_RETURN (ACE_SYNCH_MUTEX, ace_mon, this->mutex_, -1); - for (Messages_Iterator i = this->messages_.begin (); - i != this->messages_.end (); - /* do nothing */) - { - ACE_UINT32 key = (*i).key (); - if (key > ack.highest_in_sequence) - break; - this->messages_.unbind (key); - } - return 0; + Messages::Write_Guard ace_mon (this->messages_.mutex_, + this->messages_.cond_, + this->messages_.pending_writes_, + this->messages_.writing_, + this->messages_.collection_); + + ACE_RMCast_Ack_Worker worker (ack, ace_mon, &this->messages_); + + return this->messages_.for_each (&worker); } #if defined (ACE_HAS_EXPLICIT_TEMPLATE_INSTANTIATION) @@ -101,4 +178,10 @@ template class ACE_RB_Tree_Iterator<ACE_UINT32,ACE_RMCast::Data,ACE_Less_Than<AC template class ACE_RB_Tree_Reverse_Iterator<ACE_UINT32,ACE_RMCast::Data,ACE_Less_Than<ACE_UINT32>,ACE_Null_Mutex>; template class ACE_RB_Tree_Node<ACE_UINT32,ACE_RMCast::Data>; +template class ACE_RMCast_Copy_On_Write<ACE_UINT32,ACE_RMCast::Data,ACE_RMCast_Retransmission::Collection,ACE_RMCast_Retransmission::Collection_Iterator>; +template class ACE_RMCast_Copy_On_Write_Write_Guard<ACE_RMCast_Retransmission::Collection,ACE_RMCast_Retransmission::Collection_Iterator>; +template class ACE_RMCast_Copy_On_Write_Read_Guard<ACE_RMCast_Retransmission::Collection,ACE_RMCast_Retransmission::Collection_Iterator>; +template class ACE_RMCast_Copy_On_Write_Collection<ACE_RMCast_Retransmission::Collection,ACE_RMCast_Retransmission::Collection_Iterator>; +template class ACE_RMCast_Worker<ACE_UINT32,ACE_RMCast::Data>; + #endif /* ACE_HAS_EXPLICIT_TEMPLATE_INSTANTIATION */ diff --git a/ace/RMCast/RMCast_Retransmission.h b/ace/RMCast/RMCast_Retransmission.h index 7c586fe5dd6..b7bc20d2914 100644 --- a/ace/RMCast/RMCast_Retransmission.h +++ b/ace/RMCast/RMCast_Retransmission.h @@ -19,6 +19,7 @@ #include "ace/pre.h" #include "RMCast_Module.h" +#include "RMCast_Copy_On_Write.h" #include "ace/RB_Tree.h" #include "ace/Synch.h" @@ -26,38 +27,72 @@ # pragma once #endif /* ACE_LACKS_PRAGMA_ONCE */ +//! Store messages for retransmission in reliable configurations +/*! + * Reliable configurations of the RMCast framework need to store + * messages on the sender side to resend them if one or more clients + * do not receive them successfully. + */ class ACE_RMCast_Export ACE_RMCast_Retransmission : public ACE_RMCast_Module { - // = TITLE - // Reliable Multicast Retransmission - // - // = DESCRIPTION - // Define the interface for all reliable multicast retransmission public: // = Initialization and termination methods. + //! Constructor ACE_RMCast_Retransmission (void); - // Constructor + //! Destructor virtual ~ACE_RMCast_Retransmission (void); - // Destructor - // = The RMCast_Module methods + //! Use a Red-Black Tree to keep the queue of messages + typedef ACE_RB_Tree<ACE_UINT32,ACE_RMCast::Data,ACE_Less_Than<ACE_UINT32>,ACE_Null_Mutex> Collection; + typedef ACE_RB_Tree_Iterator<ACE_UINT32,ACE_RMCast::Data,ACE_Less_Than<ACE_UINT32>,ACE_Null_Mutex> Collection_Iterator; + + //! The messages are stored in the Copy_On_Write wrapper to provide + //! an efficient, but thread safe interface. + typedef ACE_RMCast_Copy_On_Write<ACE_UINT32,ACE_RMCast::Data,Collection,Collection_Iterator> Messages; + + //! Resend messages + /*! + * Resends all the messages up to \param max_sequence_number + * Returns the number of messages sent, or -1 if there where any + * errors. + */ + int resend (ACE_UINT32 max_sequence_number); + + //! Cleanup all the stored messages virtual int close (void); + + //! Pass the message downstream, but also save it in the + //! retransmission queue + /*! + * Sequence number are assigned by the ACE_RMCast_Fragmentation + * class, consequently this class first passes the message + * downstream, to obtain the sequence number and then stores the + * message for later retransmission. + */ virtual int data (ACE_RMCast::Data &data); + + //! Process an Ack message from the remote receivers. + /*! + * Normally this Ack message will be a summary of all the Ack + * messages received by the ACE_RMCast_Membership class + */ virtual int ack (ACE_RMCast::Ack &); + + //! Detect when new members join the group and Ack_Join them + /*! + * When a new receiver joins the group this module sends an Ack_Join + * message with the next sequence number that the receiver should + * expect. + * The sequence number is obtained from the current list of cached + * messages. + */ virtual int join (ACE_RMCast::Join &); protected: - typedef ACE_RB_Tree<ACE_UINT32,ACE_RMCast::Data,ACE_Less_Than<ACE_UINT32>,ACE_Null_Mutex> - Messages; - typedef ACE_RB_Tree_Iterator<ACE_UINT32,ACE_RMCast::Data,ACE_Less_Than<ACE_UINT32>,ACE_Null_Mutex> - Messages_Iterator; + //! The retransmission buffer Messages messages_; - // The retransmission buffer - - ACE_SYNCH_MUTEX mutex_; - // Synchronization }; #if defined (__ACE_INLINE__) diff --git a/ace/RMCast/RMCast_UDP_Event_Handler.h b/ace/RMCast/RMCast_UDP_Event_Handler.h index 02798cee7f8..2a6e7c45d42 100644 --- a/ace/RMCast/RMCast_UDP_Event_Handler.h +++ b/ace/RMCast/RMCast_UDP_Event_Handler.h @@ -1,16 +1,5 @@ // $Id$ -// ============================================================================ -// -// = DESCRIPTION -// Implement an adapter between the ACE Reactor and the -// ACE_RMCast_IO_UDP -// -// = AUTHOR -// Carlos O'Ryan <coryan@uci.edu> -// -// ============================================================================ - #ifndef ACE_RMCAST_UDP_EVENT_HANDLER_H #define ACE_RMCAST_UDP_EVENT_HANDLER_H #include "ace/pre.h" @@ -25,24 +14,41 @@ class ACE_RMCast_IO_UDP; class ACE_INET_Addr; +//! Implement an Adapter for the ACE_RMCast_IO_UDP class +/*! + * Applications may wish to use the ACE_Reactor to demultiplex I/O + * events for an ACE_RMCast_IO_UDP object. However other application + * may choose to make ACE_RMCast_IO_UDP active, or they may dedicate + * their own threads for its events. + * To avoid couplin ACE_RMCast_IO_UDP with the Reactor we don't make + * it derived from ACE_Event_Handler or any other class in the Reactor + * framework, instead, this simple Adapter can forward the Reactor + * messages to an ACE_RMCast_IO_UDP object. + */ class ACE_RMCast_Export ACE_RMCast_UDP_Event_Handler : public ACE_Event_Handler { public: - ACE_RMCast_UDP_Event_Handler (ACE_RMCast_IO_UDP *receiver); - // Constructor - + //! Constructor, save io_udp as the Adaptee in the Adapter pattern. + ACE_RMCast_UDP_Event_Handler (ACE_RMCast_IO_UDP *io_udp); + + //! Destructor + /*! + * Notice that this class does not own the ACE_RMCast_IO_UDP + * adaptee, so it does not destroy it. + */ ~ACE_RMCast_UDP_Event_Handler (void); - // Destructor - // = The Event_Handler methods + //@{ + //! Documented in ACE_Event_Handler class virtual ACE_HANDLE get_handle (void) const; virtual int handle_input (ACE_HANDLE fd = ACE_INVALID_HANDLE); virtual int handle_timeout (const ACE_Time_Value ¤t_time, const void *act = 0); + //@} private: + //! The adaptee ACE_RMCast_IO_UDP *io_udp_; - // The sender }; #if defined (__ACE_INLINE__) diff --git a/ace/RMCast/RMCast_UDP_Proxy.cpp b/ace/RMCast/RMCast_UDP_Proxy.cpp index 010267b0cbb..2eb0983b171 100644 --- a/ace/RMCast/RMCast_UDP_Proxy.cpp +++ b/ace/RMCast/RMCast_UDP_Proxy.cpp @@ -123,7 +123,7 @@ ACE_RMCast_UDP_Proxy::receive_message (char *buffer, size_t size) ACE_OS::memcpy (&tmp, buffer + 1, sizeof(tmp)); - ack.highest_in_sequence = ACE_NTOHL (tmp); + ack.next_expected = ACE_NTOHL (tmp); ACE_OS::memcpy (&tmp, buffer + 1 + sizeof(ACE_UINT32), sizeof(tmp)); ack.highest_received = ACE_NTOHL (tmp); @@ -175,4 +175,3 @@ ACE_RMCast_UDP_Proxy::reply_leave (ACE_RMCast::Leave &leave) { return this->io_udp_->send_leave (leave, this->peer_addr_); } - diff --git a/ace/RMCast/RMCast_Worker.cpp b/ace/RMCast/RMCast_Worker.cpp new file mode 100644 index 00000000000..06254b8c0f6 --- /dev/null +++ b/ace/RMCast/RMCast_Worker.cpp @@ -0,0 +1,19 @@ +// $Id$ + +#ifndef ACE_RMCAST_WORKER_CPP +#define ACE_RMCAST_WORKER_CPP + +#include "RMCast_Worker.h" + +#if ! defined (__ACE_INLINE__) +#include "RMCast_Worker.i" +#endif /* __ACE_INLINE__ */ + +ACE_RCSID(RMCast, RMCast_Worker, "$Id$") + +template<class KEY, class ITEM> +ACE_RMCast_Worker<KEY,ITEM>::~ACE_RMCast_Worker (void) +{ +} + +#endif /* ACE_RMCAST_WORKER_CPP */ diff --git a/ace/RMCast/RMCast_Worker.h b/ace/RMCast/RMCast_Worker.h new file mode 100644 index 00000000000..d3eb3032ebc --- /dev/null +++ b/ace/RMCast/RMCast_Worker.h @@ -0,0 +1,36 @@ +/* -*- C++ -*- */ +// $Id$ +// + +#ifndef ACE_RMCAST_WORKER_H +#define ACE_RMCAST_WORKER_H + +#include "ace/config-all.h" + +#if !defined (ACE_LACKS_PRAGMA_ONCE) +# pragma once +#endif /* ACE_LACKS_PRAGMA_ONCE */ + +template<class KEY, class ITEM> +class ACE_RMCast_Worker +{ +public: + virtual ~ACE_RMCast_Worker (void); + + virtual int work (KEY const & key, + ITEM const & item) = 0; +}; + +#if defined (__ACE_INLINE__) +#include "RMCast_Worker.i" +#endif /* __ACE_INLINE__ */ + +#if defined (ACE_TEMPLATES_REQUIRE_SOURCE) +#include "RMCast_Worker.cpp" +#endif /* ACE_TEMPLATES_REQUIRE_SOURCE */ + +#if defined (ACE_TEMPLATES_REQUIRE_PRAGMA) +#pragma implementation ("RMCast_Worker.cpp") +#endif /* ACE_TEMPLATES_REQUIRE_PRAGMA */ + +#endif /* ACE_RMCAST_WORKER_H */ diff --git a/ace/RMCast/RMCast_Worker.i b/ace/RMCast/RMCast_Worker.i new file mode 100644 index 00000000000..cfa1da318d3 --- /dev/null +++ b/ace/RMCast/RMCast_Worker.i @@ -0,0 +1 @@ +// $Id$ diff --git a/protocols/ace/RMCast/Makefile b/protocols/ace/RMCast/Makefile index ae13792c4c4..e97cd885493 100644 --- a/protocols/ace/RMCast/Makefile +++ b/protocols/ace/RMCast/Makefile @@ -636,17 +636,11 @@ include $(ACE_ROOT)/include/makeinclude/rules.local.GNU $(ACE_ROOT)/ace/Basic_Types.i \ $(ACE_ROOT)/ace/Trace.h \ $(ACE_ROOT)/ace/OS.i \ - RMCast_Export.h RMCast.i RMCast_Module.i \ - $(ACE_ROOT)/ace/RB_Tree.h \ - $(ACE_ROOT)/ace/Functor.h \ + RMCast_Export.h RMCast.i RMCast_Module.i RMCast_Copy_On_Write.h \ + RMCast_Worker.h RMCast_Worker.i RMCast_Worker.cpp \ + $(ACE_ROOT)/ace/Synch.h \ $(ACE_ROOT)/ace/ACE.h \ $(ACE_ROOT)/ace/ACE.i \ - $(ACE_ROOT)/ace/Functor.i \ - $(ACE_ROOT)/ace/Functor_T.h \ - $(ACE_ROOT)/ace/Functor_T.i \ - $(ACE_ROOT)/ace/Functor_T.cpp \ - $(ACE_ROOT)/ace/RB_Tree.i \ - $(ACE_ROOT)/ace/Synch.h \ $(ACE_ROOT)/ace/Synch.i \ $(ACE_ROOT)/ace/Synch_T.h \ $(ACE_ROOT)/ace/Event_Handler.h \ @@ -664,6 +658,14 @@ include $(ACE_ROOT)/include/makeinclude/rules.local.GNU $(ACE_ROOT)/ace/Log_Record.h \ $(ACE_ROOT)/ace/Log_Priority.h \ $(ACE_ROOT)/ace/Log_Record.i \ + RMCast_Copy_On_Write.i RMCast_Copy_On_Write.cpp \ + $(ACE_ROOT)/ace/RB_Tree.h \ + $(ACE_ROOT)/ace/Functor.h \ + $(ACE_ROOT)/ace/Functor.i \ + $(ACE_ROOT)/ace/Functor_T.h \ + $(ACE_ROOT)/ace/Functor_T.i \ + $(ACE_ROOT)/ace/Functor_T.cpp \ + $(ACE_ROOT)/ace/RB_Tree.i \ $(ACE_ROOT)/ace/Malloc.h \ $(ACE_ROOT)/ace/Malloc_Base.h \ $(ACE_ROOT)/ace/Based_Pointer_T.h \ diff --git a/protocols/ace/RMCast/RMCast.h b/protocols/ace/RMCast/RMCast.h index abf2a24e946..df3a0d48858 100644 --- a/protocols/ace/RMCast/RMCast.h +++ b/protocols/ace/RMCast/RMCast.h @@ -233,16 +233,19 @@ public: * * This message is used to provide feedback information to senders. * It contains two sequence numbers: - * - highest_in_sequence: is the sequence number of the last message - * received without any lost messages before it - * - highest_received: is the sequence number of the last_message - * successfully received, there may be some messages lost before it + * - \param next_expected: is the sequence number of the next message + * expected, i.e. (next_expected-1) is the last message received + * without any losses before it. + * - \param highest_received: is the highest sequence number among + * all the messages successfully received. + * In other words, all messages lost (if any) are in the range: + * [next_expected,highest_received) * * <CODE> * +---------+----------------------+<BR> * | 8 bits | MT_ACK |<BR> * +---------+----------------------+<BR> - * | 32 bits | highest_in_sequence |<BR> + * | 32 bits | next_expected |<BR> * +---------+----------------------+<BR> * | 32 bits | highest_received |<BR> * +---------+----------------------+<BR> @@ -251,7 +254,7 @@ public: struct Ack { //! The last message received without any losses before it. - ACE_UINT32 highest_in_sequence; + ACE_UINT32 next_expected; //! The last message successfully received ACE_UINT32 highest_received; diff --git a/protocols/ace/RMCast/RMCast_Copy_On_Write.cpp b/protocols/ace/RMCast/RMCast_Copy_On_Write.cpp new file mode 100644 index 00000000000..f1553c7f4ab --- /dev/null +++ b/protocols/ace/RMCast/RMCast_Copy_On_Write.cpp @@ -0,0 +1,176 @@ +// $Id$ + +#ifndef ACE_RMCAST_COPY_ON_WRITE_CPP +#define ACE_RMCAST_COPY_ON_WRITE_CPP + +#include "RMCast_Copy_On_Write.h" + +#if ! defined (__ACE_INLINE__) +#include "RMCast_Copy_On_Write.i" +#endif /* __ACE_INLINE__ */ + +ACE_RCSID(RMCast, RMCast_Copy_On_Write, "$Id$") + +template<class COLLECTION, class ITERATOR> void +ACE_RMCast_Copy_On_Write_Collection<COLLECTION,ITERATOR>::_incr_refcnt (void) +{ + // LOCKING: no locking is required, the caller grabs the mutex. + this->refcount_++; +} + +template<class COLLECTION, class ITERATOR> void +ACE_RMCast_Copy_On_Write_Collection<COLLECTION,ITERATOR>::_decr_refcnt (void) +{ + // LOCKING: no locking is required, the caller grabs the mutex. + { + this->refcount_--; + if (this->refcount_ != 0) + return; + } + //@@ TODO: If this wrapper is going to be completely general some + // kind of functor has to be provided to remove the elements in the + // collection, in case the are no self-managed + + delete this; +} + +// **************************************************************** + +template<class KEY, class ITEM, class COLLECTION, class ITERATOR> +ACE_RMCast_Copy_On_Write<KEY,ITEM,COLLECTION,ITERATOR>:: + ACE_RMCast_Copy_On_Write (void) + : pending_writes_ (0) + , writing_ (0) + , cond_ (mutex_) +{ + ACE_NEW (this->collection_, Collection); +} + +template<class KEY, class ITEM, class COLLECTION, class ITERATOR> +ACE_RMCast_Copy_On_Write<KEY,ITEM,COLLECTION,ITERATOR>:: + ~ACE_RMCast_Copy_On_Write (void) +{ + ACE_GUARD (ACE_SYNCH_MUTEX, ace_mon, this->mutex_); + + while (this->pending_writes_ != 0) + this->cond_.wait (); + + this->collection_->_decr_refcnt (); + this->collection_ = 0; +} + +template<class KEY, class ITEM, class COLLECTION, class ITERATOR> int +ACE_RMCast_Copy_On_Write<KEY,ITEM,COLLECTION,ITERATOR>:: + for_each (ACE_RMCast_Worker<KEY,ITEM> *worker) +{ + Read_Guard ace_mon (this->mutex_, this->collection_); + + ITERATOR end = ace_mon.collection->collection.end (); + for (ITERATOR i = ace_mon.collection->collection.begin (); i != end; ++i) + { + int r = worker->work ((*i).key (), (*i).item ()); + if (r != 0) + return r; + } + return 0; +} + +template<class KEY, class ITEM, class C, class I> int +ACE_RMCast_Copy_On_Write<KEY,ITEM,C,I>::bind (KEY const & k, + ITEM const & i) +{ + Write_Guard ace_mon (this->mutex_, + this->cond_, + this->pending_writes_, + this->writing_, + this->collection_); + + return this->bind_i (ace_mon, k, i); +} + +template<class KEY, class ITEM, class C, class I> int +ACE_RMCast_Copy_On_Write<KEY,ITEM,C,I>::unbind (KEY const & k) +{ + Write_Guard ace_mon (this->mutex_, + this->cond_, + this->pending_writes_, + this->writing_, + this->collection_); + + return this->unbind_i (ace_mon, k); +} + +template<class KEY, class ITEM, class C, class I> int +ACE_RMCast_Copy_On_Write<KEY,ITEM,C,I>::bind_i (Write_Guard &ace_mon, + KEY const & k, + ITEM const & i) +{ + return ace_mon.copy->collection.bind (k, i); +} + +template<class KEY, class ITEM, class C, class I> int +ACE_RMCast_Copy_On_Write<KEY,ITEM,C,I>::unbind_i (Write_Guard &ace_mon, + KEY const & k) +{ + return ace_mon.copy->collection.unbind (k); +} + +// **************************************************************** + +template<class COLLECTION, class ITERATOR> +ACE_RMCast_Copy_On_Write_Write_Guard<COLLECTION,ITERATOR>:: + ACE_RMCast_Copy_On_Write_Write_Guard (ACE_SYNCH_MUTEX &m, + ACE_SYNCH_CONDITION &c, + int &p, + int &w, + Collection*& cr) + : copy (0) + , mutex (m) + , cond (c) + , pending_writes (p) + , writing_flag (w) + , collection (cr) +{ + { + ACE_GUARD (ACE_SYNCH_MUTEX, ace_mon, this->mutex); + + this->pending_writes++; + + while (this->writing_flag != 0) + this->cond.wait (); + + this->writing_flag = 1; + } + + // Copy outside the mutex, because it may take a long time. + // Nobody can change it, because it is protected by the + // writing_flag. + + // First initialize it (with the correct reference count + ACE_NEW (this->copy, Collection); + // Copy the contents + this->copy->collection = this->collection->collection; +} + +template<class COLLECTION, class ITERATOR> +ACE_RMCast_Copy_On_Write_Write_Guard<COLLECTION,ITERATOR>:: + ~ACE_RMCast_Copy_On_Write_Write_Guard (void) +{ + Collection *tmp = 0; + { + ACE_GUARD (ACE_SYNCH_MUTEX, ace_mon, this->mutex); + + tmp = this->collection; + this->collection = this->copy; + this->writing_flag = 0; + this->pending_writes--; + + this->cond.signal (); + } + // Delete outside the mutex, because it may take a long time. + tmp->_decr_refcnt (); +} + +// **************************************************************** + +#endif /* ACE_RMCAST_COPY_ON_WRITE_CPP */ diff --git a/protocols/ace/RMCast/RMCast_Copy_On_Write.h b/protocols/ace/RMCast/RMCast_Copy_On_Write.h new file mode 100644 index 00000000000..8724e23a5d5 --- /dev/null +++ b/protocols/ace/RMCast/RMCast_Copy_On_Write.h @@ -0,0 +1,173 @@ +/* -*- C++ -*- */ +// $Id$ +// + +#ifndef ACE_RMCAST_COPY_ON_WRITE_H +#define ACE_RMCAST_COPY_ON_WRITE_H +#include "ace/pre.h" + +#include "RMCast_Worker.h" +#include "ace/Synch.h" + +//! A wrapper to implement reference counted collections +template<class COLLECTION, class ITERATOR> +class ACE_RMCast_Copy_On_Write_Collection +{ +public: + //! Constructor + ACE_RMCast_Copy_On_Write_Collection (void); + + //! Increment the reference count + void _incr_refcnt (void); + + //! Decrement the reference count + void _decr_refcnt (void); + + //! The actual collection + COLLECTION collection; + +private: + //! The reference count + ACE_UINT32 refcount_; +}; + +// **************************************************************** + +//! Implement a read guard for a reference counted collection +template<class COLLECTION, class ITERATOR> +class ACE_RMCast_Copy_On_Write_Read_Guard +{ +public: + typedef ACE_RMCast_Copy_On_Write_Collection<COLLECTION,ITERATOR> Collection; + + //! Constructor + ACE_RMCast_Copy_On_Write_Read_Guard (ACE_SYNCH_MUTEX &mutex, + Collection *&collection); + + //! Destructor + ~ACE_RMCast_Copy_On_Write_Read_Guard (void); + + //! A reference to the collection + Collection *collection; + +private: + //! Synchronization + ACE_SYNCH_MUTEX &mutex_; +}; + +// **************************************************************** + +//! Implement the write guard for a reference counted collecion +/*! + * This helper class atomically increments the reference count of a + * ACE_RMCast_Copy_On_Write_Collection and reads the current + * collection in the Copy_On_Write class. + */ +template<class COLLECTION, class ITERATOR> +class ACE_RMCast_Copy_On_Write_Write_Guard +{ +public: + typedef ACE_RMCast_Copy_On_Write_Collection<COLLECTION,ITERATOR> Collection; + + //! Constructor + ACE_RMCast_Copy_On_Write_Write_Guard (ACE_SYNCH_MUTEX &mutex, + ACE_SYNCH_CONDITION &cond, + int &pending_writes, + int &writing_flag, + Collection*& collection); + + //! Destructor + ~ACE_RMCast_Copy_On_Write_Write_Guard (void); + + //! The collection + Collection *copy; + +private: + //! Keep a reference to the mutex + ACE_SYNCH_MUTEX &mutex; + + //! Keep a reference to the condition variable + ACE_SYNCH_CONDITION &cond; + + //! Use a reference to update the pending writes count + int &pending_writes; + + //! Use a reference to update the writing flag + int &writing_flag; + + //! Use this reference to update the collection once the + //! modifications are finished. + Collection *&collection; +}; + +// **************************************************************** + +//! Implement a copy on write wrapper for a map-like collection +template<class KEY, class ITEM, class COLLECTION, class ITERATOR> +class ACE_RMCast_Copy_On_Write +{ +public: + //! The Read_Guard trait + typedef ACE_RMCast_Copy_On_Write_Read_Guard<COLLECTION,ITERATOR> Read_Guard; + + //! The Write_Guard trait + typedef ACE_RMCast_Copy_On_Write_Write_Guard<COLLECTION,ITERATOR> Write_Guard; + + //! The underlying collection type + typedef ACE_RMCast_Copy_On_Write_Collection<COLLECTION,ITERATOR> Collection; + + //! Constructor + ACE_RMCast_Copy_On_Write (void); + + //! Destructor + ~ACE_RMCast_Copy_On_Write (void); + + //! Iterate over all the elements invoking \param worker on each one. + int for_each (ACE_RMCast_Worker<KEY,ITEM> *worker); + + //! Add a new element + int bind (KEY const & key, ITEM const & item); + + //! Remove an element + int unbind (KEY const & key); + + //! Bind assuming the Write_Guard is held + int bind_i (Write_Guard &guard, KEY const & key, ITEM const & item); + + //! Unbind assuming the Write_Guard is held + int unbind_i (Write_Guard &guard, KEY const & key); + + //! Number of pending writes + int pending_writes_; + + //! If non-zero then a thread is changing the collection. + /*! + * Many threads can use the collection simulatenously, but only one + * change it. + */ + int writing_; + + //! A mutex to serialize access to the collection pointer. + ACE_SYNCH_MUTEX mutex_; + + //! A condition variable to wait to synchronize multiple writers. + ACE_SYNCH_CONDITION cond_; + + //! The collection, with reference counting added + Collection *collection_; +}; + +#if defined (__ACE_INLINE__) +#include "RMCast_Copy_On_Write.i" +#endif /* __ACE_INLINE__ */ + +#if defined (ACE_TEMPLATES_REQUIRE_SOURCE) +#include "RMCast_Copy_On_Write.cpp" +#endif /* ACE_TEMPLATES_REQUIRE_SOURCE */ + +#if defined (ACE_TEMPLATES_REQUIRE_PRAGMA) +#pragma implementation ("RMCast_Copy_On_Write.cpp") +#endif /* ACE_TEMPLATES_REQUIRE_PRAGMA */ + +#include "ace/post.h" +#endif /* ACE_RMCAST_COPY_ON_WRITE_H */ diff --git a/protocols/ace/RMCast/RMCast_Copy_On_Write.i b/protocols/ace/RMCast/RMCast_Copy_On_Write.i new file mode 100644 index 00000000000..c6e5099cda5 --- /dev/null +++ b/protocols/ace/RMCast/RMCast_Copy_On_Write.i @@ -0,0 +1,36 @@ +// $Id$ + +template<class COLLECTION, class ITERATOR> ACE_INLINE +ACE_RMCast_Copy_On_Write_Collection<COLLECTION,ITERATOR>:: + ACE_RMCast_Copy_On_Write_Collection (void) + : refcount_ (1) +{ +} + +// **************************************************************** + +template<class COLLECTION, class ITERATOR> ACE_INLINE +ACE_RMCast_Copy_On_Write_Read_Guard<COLLECTION,ITERATOR>:: + ACE_RMCast_Copy_On_Write_Read_Guard (ACE_SYNCH_MUTEX &m, + Collection*& collection_ref) + : collection (0) + , mutex_ (m) +{ + ACE_GUARD (ACE_SYNCH_MUTEX, ace_mon, this->mutex_); + this->collection = collection_ref; + this->collection->_incr_refcnt (); +} + +template<class COLLECTION, class ITERATOR> ACE_INLINE +ACE_RMCast_Copy_On_Write_Read_Guard<COLLECTION,ITERATOR>:: + ~ACE_RMCast_Copy_On_Write_Read_Guard (void) +{ + if (this->collection != 0) + { + ACE_GUARD (ACE_SYNCH_MUTEX, ace_mon, this->mutex_); + this->collection->_decr_refcnt (); + } +} + +// **************************************************************** + diff --git a/protocols/ace/RMCast/RMCast_Fragment.h b/protocols/ace/RMCast/RMCast_Fragment.h index 7b64d763ebc..eed08c92517 100644 --- a/protocols/ace/RMCast/RMCast_Fragment.h +++ b/protocols/ace/RMCast/RMCast_Fragment.h @@ -1,15 +1,5 @@ // $Id$ -// ============================================================================ -// -// = DESCRIPTION -// The fragmentation task for the reliable multicast library -// -// = AUTHOR -// Carlos O'Ryan <coryan@uci.edu> -// -// ============================================================================ - #ifndef ACE_RMCAST_FRAGMENT_H #define ACE_RMCAST_FRAGMENT_H #include "ace/pre.h" @@ -21,28 +11,46 @@ # pragma once #endif /* ACE_LACKS_PRAGMA_ONCE */ +//! Default fragment size #ifndef ACE_RMCAST_DEFAULT_FRAGMENT_SIZE # define ACE_RMCAST_DEFAULT_FRAGMENT_SIZE 1024 #endif /* ACE_RMCAST_DEFAULT_FRAGMENT_SIZE */ +//! Fragmentation module +/*! + * Some transports cannot send very big messages, for example UDP + * imposes a limit of 64K, and in practice the limit is even more + * strict than that. + * This class decomposes a message into multiple fragments, using an + * application defined maximum size. + */ class ACE_RMCast_Export ACE_RMCast_Fragment : public ACE_RMCast_Module { public: + //! Constructor ACE_RMCast_Fragment (void); - // Constructor + //! Destructor virtual ~ACE_RMCast_Fragment (void); - // Destructor + //! Accessor for the max_fragment size. + /*! There is no modifier, the maximum fragment size is obtained + * using feedback from the lower layers (transport?) + * @@TODO We have not implemented the feedback mechanisms yet! + */ size_t max_fragment_size (void) const; - // Accessor for the max_fragment size. - // There is no modifier, the maximum fragment size is obtained using - // feedback from the lower layer (transport?) - // = The ACE_RMCast_Module methods + /*! + * Only data messages need fragmentation, the control messages are + * all small enough for all the transports that I know about. + * Well, actually for CAN-Bus (Controller Area Network), they may be + * too big, because the max payload there is 8 bytes, but we don't + * play with those in ACE. + */ virtual int data (ACE_RMCast::Data &data); private: + //! Current fragment size limit size_t max_fragment_size_; }; diff --git a/protocols/ace/RMCast/RMCast_IO_UDP.cpp b/protocols/ace/RMCast/RMCast_IO_UDP.cpp index af655f3130f..421982d5ad6 100644 --- a/protocols/ace/RMCast/RMCast_IO_UDP.cpp +++ b/protocols/ace/RMCast/RMCast_IO_UDP.cpp @@ -354,7 +354,7 @@ ACE_RMCast_IO_UDP::send_ack (ACE_RMCast::Ack &ack, char header[16]; header[0] = ACE_RMCast::MT_ACK; - ACE_UINT32 tmp = ACE_HTONL (ack.highest_in_sequence); + ACE_UINT32 tmp = ACE_HTONL (ack.next_expected); ACE_OS::memcpy (header + 1, &tmp, sizeof(ACE_UINT32)); tmp = ACE_HTONL (ack.highest_received); diff --git a/protocols/ace/RMCast/RMCast_IO_UDP.h b/protocols/ace/RMCast/RMCast_IO_UDP.h index bdcccabe6e1..5af403bf994 100644 --- a/protocols/ace/RMCast/RMCast_IO_UDP.h +++ b/protocols/ace/RMCast/RMCast_IO_UDP.h @@ -33,44 +33,65 @@ class ACE_Time_Value; class ACE_RMCast_Export ACE_RMCast_IO_UDP : public ACE_RMCast_Module { public: + //! Constructor + /*! + * The <factory> argument is used to create the modules for each + * proxy that process incoming messages. The class does *not* assume + * ownership of <factory>, the caller owns it. But it does assume + * ownership of the modules returned by the factory, and it may ask + * the factory to release them eventually. + */ ACE_RMCast_IO_UDP (ACE_RMCast_Module_Factory *factory); - // Constructor - // <factory> is used to create the modules for each proxy that - // process incoming messages. The class does *not* assume ownership - // of <factory>, the caller owns it. + //! Destructor ~ACE_RMCast_IO_UDP (void); - // Destructor + //! Join a new multicast group + /*! + * Start receiving data for the <mcast_addr> multicast group. + * Please read the documentation of ACE_SOCK_Dgram_Mcast for more + * details. + */ int subscribe (const ACE_INET_Addr &mcast_addr, int reuse_addr = 1, const ACE_TCHAR *net_if = 0, int protocol_family = PF_INET, int protocol = 0); - // Start receiving data for the <mcast_addr> multicast group. - // Please read the documentation of <ACE_SOCK_Dgram_Mcast> for more - // details. // The class can be used with a Reactor or using blocking I/O // depending on what method of the following two is called. + //! Wait for events for the period <tv>. If <tv> is zero it blocks + //! forever. int handle_events (ACE_Time_Value *tv = 0); - // Wait for events for the period <tv>. If <tv> is zero it blocks - // forever. + //! Register any event handlers into <reactor> + /*! + * @@TODO: This should be left for the clients of the class, there + * is no reason why this class must know about reactors. + */ int register_handlers (ACE_Reactor *reactor); - // Register any event handlers into <reactor> + //! Remove all the handlers from the reactor + /*! + * @@TODO: This should be left for the clients of the class, there + * is no reason why this class must know about reactors. + */ int remove_handlers (void); - // Remove all the handlers from the reactor + //! There is data to read, read it and process it. int handle_input (ACE_HANDLE h); - // There is data to read, read it and process it. + //! Obtain the handle for the underlying socket ACE_HANDLE get_handle (void) const; - // Obtain the handle for the underlying socket - // Send back to the remove object represented by <proxy> + //@{ + //! Send the message to the ACE_INET_Addr argument. + /*! + * These methods are used in the implementation of the + * ACE_RMCast_UDP_Proxy objects and the implementation of the + * inherited ACE_RMCast_Module methods in this class. + */ int send_data (ACE_RMCast::Data &, const ACE_INET_Addr &); int send_poll (ACE_RMCast::Poll &, const ACE_INET_Addr &); int send_ack_join (ACE_RMCast::Ack_Join &, const ACE_INET_Addr &); @@ -78,8 +99,9 @@ public: int send_ack (ACE_RMCast::Ack &, const ACE_INET_Addr &); int send_join (ACE_RMCast::Join &, const ACE_INET_Addr &); int send_leave (ACE_RMCast::Leave &, const ACE_INET_Addr &); + //@} - // = The RMCast_Module methods + // Please read the documentation in ACE_RMCast_Module for more details virtual int data (ACE_RMCast::Data &); virtual int poll (ACE_RMCast::Poll &); virtual int ack_join (ACE_RMCast::Ack_Join &); @@ -87,23 +109,24 @@ public: virtual int ack (ACE_RMCast::Ack &); virtual int join (ACE_RMCast::Join &); virtual int leave (ACE_RMCast::Leave &); - // The messages are sent to the multicast group private: + //! The factory used to create the modules attached to each proxy ACE_RMCast_Module_Factory *factory_; - // The factory used to create the modules attached to each proxy + //! The multicast group we subscribe and send to ACE_INET_Addr mcast_group_; - // The multicast group we subscribe and send to + //! The socket used to receive and send data ACE_SOCK_Dgram_Mcast dgram_; - // The socket + //! Use a Hash_Map to maintain the collection of proxies typedef ACE_Hash_Map_Manager<ACE_INET_Addr,ACE_RMCast_UDP_Proxy*,ACE_Null_Mutex> Map; + //! The collection of proxies Map map_; + //! The event handler adapter ACE_RMCast_UDP_Event_Handler eh_; - // The event handler adapter }; #if defined (__ACE_INLINE__) diff --git a/protocols/ace/RMCast/RMCast_Membership.cpp b/protocols/ace/RMCast/RMCast_Membership.cpp index 6ee2690a41f..a23d7a756e5 100644 --- a/protocols/ace/RMCast/RMCast_Membership.cpp +++ b/protocols/ace/RMCast/RMCast_Membership.cpp @@ -28,14 +28,14 @@ ACE_RMCast_Membership::ack (ACE_RMCast::Ack &ack) ACE_RMCast::Ack next_ack; { ACE_GUARD_RETURN (ACE_SYNCH_MUTEX, ace_mon, this->mutex_, -1); - if (ack.highest_in_sequence < this->highest_in_sequence_) + if (ack.next_expected < this->next_expected_) { // @@ This violates an invariant of the class, shouldn't // happen... // ACE_DEBUG ((LM_DEBUG, "ACE_RMCast_Membership::ack[3]\n")); return -1; } - else if (ack.highest_in_sequence == this->highest_in_sequence_) + else if (ack.next_expected == this->next_expected_) { // Nothing new, just continue.... // ACE_DEBUG ((LM_DEBUG, "ACE_RMCast_Membership::ack[4]\n")); @@ -43,21 +43,23 @@ ACE_RMCast_Membership::ack (ACE_RMCast::Ack &ack) } // Possible update, re-evaluate the story... - ACE_UINT32 highest_in_sequence = (*i)->highest_in_sequence (); + ACE_UINT32 next_expected = (*i)->next_expected (); ACE_UINT32 highest_received = (*i)->highest_received (); ++i; for (; i != end; ++i) { - ACE_UINT32 s = (*i)->highest_in_sequence (); - if (s < highest_in_sequence) - highest_in_sequence = s; + ACE_UINT32 s = (*i)->next_expected (); + if (s < next_expected) + next_expected = s; ACE_UINT32 r = (*i)->highest_received (); if (r > highest_received) highest_received = r; } #if 0 - if (this->highest_in_sequence_ >= highest_in_sequence + // @@TODO: this is an important feature, disabled until it is + // fully debugged + if (this->next_expected_ >= next_expected || this->highest_received_ >= highest_received) { // No change.... @@ -65,12 +67,12 @@ ACE_RMCast_Membership::ack (ACE_RMCast::Ack &ack) return 0; } #endif /* 0 */ - this->highest_in_sequence_ = highest_in_sequence; + this->next_expected_ = next_expected; this->highest_received_ = highest_received; if (this->next () == 0) return 0; next_ack.source = ack.source; - next_ack.highest_in_sequence = this->highest_in_sequence_; + next_ack.next_expected = this->next_expected_; next_ack.highest_received = this->highest_received_; } // @@ This looks like a race condition, next() is checked inside the @@ -89,6 +91,8 @@ ACE_RMCast_Membership::join (ACE_RMCast::Join &join) ACE_GUARD_RETURN (ACE_SYNCH_MUTEX, ace_mon, this->mutex_, -1); if (this->proxies_.insert (join.source) == -1) return -1; + // @@TODO: This may change the next Ack to send up, should + // recompute and send the right message if that was the case. } return this->ACE_RMCast_Module::join (join); @@ -103,6 +107,8 @@ ACE_RMCast_Membership::leave (ACE_RMCast::Leave &leave) { ACE_GUARD_RETURN (ACE_SYNCH_MUTEX, ace_mon, this->mutex_, -1); (void) this->proxies_.remove (leave.source); + // @@TODO: This may change the next Ack to send up, should + // recompute and send the right message if that was the case. } return this->ACE_RMCast_Module::leave (leave); diff --git a/protocols/ace/RMCast/RMCast_Membership.h b/protocols/ace/RMCast/RMCast_Membership.h index a99a7752507..21ee1bea97b 100644 --- a/protocols/ace/RMCast/RMCast_Membership.h +++ b/protocols/ace/RMCast/RMCast_Membership.h @@ -28,41 +28,59 @@ class ACE_RMCast_Proxy; +//! Track peer membership +/*! + * Reliable senders of events need to know exactly how many peers are + * receiving the events, and how many events has each peer received so + * far. + * This class uses the Join, Leave and Ack messages to build that + * information, it also summarizes the Ack events and propagate only + * the global info to the upper layer. + */ class ACE_RMCast_Export ACE_RMCast_Membership : public ACE_RMCast_Module { - // = TITLE - // Track Receiver membership - // - // = DESCRIPTION - // Define the interface for all reliable multicast membership public: - // = Initialization and termination methods. + //! Constructor ACE_RMCast_Membership (void); - // Constructor + //! Destructor virtual ~ACE_RMCast_Membership (void); - // Destructor - // = The RMCast_Module methods + //! Receive an process an Ack message + /*! + * After receiving the Ack message we find out what is the lowest + * sequence number received in order among all the acks received by + * the proxies in the collection. We also find out what is the + * highest sequence number received by any proxy. + * We only propagate that information back to the upper layer, and + * then only if there are any news since the last Ack. + */ virtual int ack (ACE_RMCast::Ack &); + + //! Add a new member to the collection, using the <source> field in + //! the Join message virtual int join (ACE_RMCast::Join &); + + //! Remove a member from the collection, using the <source> field in + //! the Join message virtual int leave (ACE_RMCast::Leave &); protected: + //! Use an unbounded set to maintain the collection of proxies. typedef ACE_Unbounded_Set<ACE_RMCast_Proxy*> Proxy_Collection; typedef ACE_Unbounded_Set_Iterator<ACE_RMCast_Proxy*> Proxy_Iterator; + //! The collection of proxies Proxy_Collection proxies_; - // The membership buffer - ACE_UINT32 highest_in_sequence_; - // The smallest value of <highest_in_sequence> for all the proxies + //! The smallest value of \param next_expected for all the proxies + ACE_UINT32 next_expected_; + //! The highest value of \param highest_received for all the proxies ACE_UINT32 highest_received_; - // The highest value of <highest_received> for all the proxies + //! Synchronization ACE_SYNCH_MUTEX mutex_; - // Synchronization }; #if defined (__ACE_INLINE__) diff --git a/protocols/ace/RMCast/RMCast_Membership.i b/protocols/ace/RMCast/RMCast_Membership.i index 0c3e33c2d01..b513c2d5141 100644 --- a/protocols/ace/RMCast/RMCast_Membership.i +++ b/protocols/ace/RMCast/RMCast_Membership.i @@ -2,7 +2,7 @@ ACE_INLINE ACE_RMCast_Membership::ACE_RMCast_Membership (void) - : highest_in_sequence_ (0) + : next_expected_ (0) , highest_received_ (0) { } diff --git a/protocols/ace/RMCast/RMCast_Module.h b/protocols/ace/RMCast/RMCast_Module.h index dc4077fa4ab..fad76caac53 100644 --- a/protocols/ace/RMCast/RMCast_Module.h +++ b/protocols/ace/RMCast/RMCast_Module.h @@ -36,55 +36,55 @@ class ACE_Time_Value; class ACE_RMCast_Export ACE_RMCast_Module { public: - // = Initialization and termination methods. + //! Constructor ACE_RMCast_Module (void); - //!< Constructor + //! Destructor virtual ~ACE_RMCast_Module (void); - //!< Destructor + //! Modifier for the next element in the stack virtual int next (ACE_RMCast_Module *next); - //!< Modifier for the next element in the stack + //! Accesor for the next element in the stack virtual ACE_RMCast_Module* next (void) const; - //!< Accesor for the next element in the stack + //! Modifier for the previous element in the stack virtual int prev (ACE_RMCast_Module *prev); - //!< Modifier for the previous element in the stack + //! Accesor for the previous element in the stack virtual ACE_RMCast_Module* prev (void) const; - //!< Accesor for the previous element in the stack + //! Initialize the module, setting up the next module virtual int open (void); - //!< Initialize the module, setting up the next module + //! Close the module. virtual int close (void); - //!< Close the module. + //! Push data through the stack virtual int data (ACE_RMCast::Data &); - //!< Push data through the stack + //! Push a polling request through the stack virtual int poll (ACE_RMCast::Poll &); - //!< Push a polling request through the stack + //! Push a message to ack a join request through the stack virtual int ack_join (ACE_RMCast::Ack_Join &); - //!< Push a message to ack a join request through the stack + //! Push a message to ack a leave request through the stack virtual int ack_leave (ACE_RMCast::Ack_Leave &); - //!< Push a message to ack a leave request through the stack + //! Push an ack mesage through the stack virtual int ack (ACE_RMCast::Ack &); - //!< Push an ack mesage through the stack + //! Push a join message through the stack virtual int join (ACE_RMCast::Join &); - //!< Push a join message through the stack + //! Push a leave message through the stack virtual int leave (ACE_RMCast::Leave &); - //!< Push a leave message through the stack private: //! The next element in the stack ACE_RMCast_Module *next_; + //! The previous element in the stack ACE_RMCast_Module *prev_; }; diff --git a/protocols/ace/RMCast/RMCast_Module_Factory.h b/protocols/ace/RMCast/RMCast_Module_Factory.h index 722ad87d678..f0ea58df0e5 100644 --- a/protocols/ace/RMCast/RMCast_Module_Factory.h +++ b/protocols/ace/RMCast/RMCast_Module_Factory.h @@ -27,19 +27,40 @@ class ACE_RMCast_Module; class ACE_RMCast_IO_UDP; +//! Create Module stacks +/*! + * Different application will probably require different + * configurations in their Module stack, some will just want best + * effort semantics. Others will use Reliable communication with a + * maximum retransmission time. Furthermore, applications may want to + * receive messages in send order, or just as soon as they are + * received. + * Obviously most applications will want to change want happens once a + * message is completely received. + * + * To achieve all this flexibility the IO layer uses this factory to + * create the full stack of Modules corresponding to a single + * consumer. + * To keep the complexity under control the intention is to create + * helper Factories, such as Reliable_Module_Factory where + * applications only need to customize a few features. + */ class ACE_RMCast_Export ACE_RMCast_Module_Factory { - // = DESCRIPTION - // public: + //! Destructor virtual ~ACE_RMCast_Module_Factory (void); - // Destructor + //! Create a new proxy virtual ACE_RMCast_Module *create (ACE_RMCast_IO_UDP *) = 0; - // Create a new proxy + //! Destroy a proxy + /*! + * Some factories may allocate modules from a pool, or return the + * same module for all proxies. Consequently, only the factory + * knows how to destroy them. + */ virtual void destroy (ACE_RMCast_Module *) = 0; - // Destroy a proxy }; #if defined (__ACE_INLINE__) diff --git a/protocols/ace/RMCast/RMCast_Partial_Message.h b/protocols/ace/RMCast/RMCast_Partial_Message.h index 9b71eb4a541..88fa9ab2f1a 100644 --- a/protocols/ace/RMCast/RMCast_Partial_Message.h +++ b/protocols/ace/RMCast/RMCast_Partial_Message.h @@ -26,44 +26,72 @@ #define ACE_RMCAST_DEFAULT_HOLE_COUNT 16 #endif /* ACE_RMCAST_DEFAULT_HOLE_COUNT */ +//! Represent a partially received message in the +//! ACE_RMCast_Reassembly module +/*! + * This class provides temporary storage for the fragments as they are + * received in the ACE_RMCast_Reassembly module. It also keeps track + * of what portions of the message are still missing. + */ class ACE_RMCast_Export ACE_RMCast_Partial_Message { public: + //! Constructor, reserve enough memory for the complete message ACE_RMCast_Partial_Message (ACE_UINT32 message_size); + + //! Destructor ~ACE_RMCast_Partial_Message (void); + //! Process a fragment + /*! + * A fragment starting at <offset> has been received, copy the + * fragment contents and update the list of holes. + */ int fragment_received (ACE_UINT32 message_size, ACE_UINT32 offset, ACE_Message_Block *mb); + + //! Return 1 if the message is complete int is_complete (void) const; + //! Return the body of the message, the memory is *not* owned by the + //! caller ACE_Message_Block *message_body (void); - // Return the body of the message, the memory is owned by the - // class. private: + //! Insert a new hole into the list + /*! + * The class keeps an array to represent the missing portions of the + * message. This method inserts a new hole, i.e. a new element in + * the array at index <i>. The <start> and <end> arguments represent + * the offsets of the missing portion of the message. + */ int insert_hole (size_t i, ACE_UINT32 start, ACE_UINT32 end); - // Insert a new hole into the list + //! Remove a hole from the list int remove_hole (size_t i); - // Remove a hole from the list private: + //! Maintain the message storage ACE_Message_Block message_body_; - // Used to rebuild the body of the message + //! Represent a missing portion of a message struct Hole { + //! Offset where the missing portion of the message starts ACE_UINT32 start; + //! Offset where the missing portion of the message ends ACE_UINT32 end; }; + //! Implement a growing array of Hole structures + //@{ Hole *hole_list_; size_t max_hole_count_; size_t hole_count_; - // The current list of holes in the message_body. + //@} }; #if defined (__ACE_INLINE__) diff --git a/protocols/ace/RMCast/RMCast_Proxy.cpp b/protocols/ace/RMCast/RMCast_Proxy.cpp index 53d9d0b6726..f6b2bbec5e5 100644 --- a/protocols/ace/RMCast/RMCast_Proxy.cpp +++ b/protocols/ace/RMCast/RMCast_Proxy.cpp @@ -15,9 +15,9 @@ ACE_RMCast_Proxy::~ACE_RMCast_Proxy (void) } ACE_UINT32 -ACE_RMCast_Proxy::highest_in_sequence (void) const +ACE_RMCast_Proxy::next_expected (void) const { - return this->highest_in_sequence_; + return this->next_expected_; } ACE_UINT32 @@ -29,7 +29,7 @@ ACE_RMCast_Proxy::highest_received (void) const int ACE_RMCast_Proxy::ack (ACE_RMCast::Ack &ack) { - this->highest_in_sequence_ = ack.highest_in_sequence; + this->next_expected_ = ack.next_expected; this->highest_received_ = ack.highest_received; return this->ACE_RMCast_Module::ack (ack); } diff --git a/protocols/ace/RMCast/RMCast_Proxy.h b/protocols/ace/RMCast/RMCast_Proxy.h index 414b74174fb..e0e6afe79b1 100644 --- a/protocols/ace/RMCast/RMCast_Proxy.h +++ b/protocols/ace/RMCast/RMCast_Proxy.h @@ -48,27 +48,28 @@ public: //! Destructor virtual ~ACE_RMCast_Proxy (void); - - //! Return the highest sequence number received without any losses - //! before it. Only applies to remote receiver proxies. + + //! Return the next sequence number expected by the peer. Only + //! applies to remote receiver proxies. /*! - Please read the documentation in ACE_RMCast::Ack + * Please read the documentation in ACE_RMCast::Ack */ - virtual ACE_UINT32 highest_in_sequence (void) const; + virtual ACE_UINT32 next_expected (void) const; //! Return the highest sequence number successfully received. //! Only applies to remote receiver proxies. /*! - Please read the documentation in ACE_RMCast::Ack + * Please read the documentation in ACE_RMCast::Ack */ virtual ACE_UINT32 highest_received (void) const; //@{ //! Send messages directly to the peer. - /*! Send a message directly to the peer, i.e. the message is not - sent through the multicast group and it may not be processed by - all the layers in the stack. - */ + /*! + * Send a message directly to the peer, i.e. the message is not + * sent through the multicast group and it may not be processed by + * all the layers in the stack. + */ virtual int reply_data (ACE_RMCast::Data &) = 0; virtual int reply_poll (ACE_RMCast::Poll &) = 0; virtual int reply_ack_join (ACE_RMCast::Ack_Join &) = 0; @@ -79,8 +80,8 @@ public: //@} /*! - Proxies process the ACK sequence numbers to save the sequence - numbers reported from the remote peer. + * Proxies process the ACK sequence numbers to cache the ack + * information from the peer. */ virtual int ack (ACE_RMCast::Ack &); @@ -88,7 +89,7 @@ private: //@{ //! Cache the sequence numbers reported from the remote peer using //! Ack messages - ACE_UINT32 highest_in_sequence_; + ACE_UINT32 next_expected_; ACE_UINT32 highest_received_; //@} }; diff --git a/protocols/ace/RMCast/RMCast_Proxy.i b/protocols/ace/RMCast/RMCast_Proxy.i index f93feaa5639..6fee09fe9e5 100644 --- a/protocols/ace/RMCast/RMCast_Proxy.i +++ b/protocols/ace/RMCast/RMCast_Proxy.i @@ -2,7 +2,7 @@ ACE_INLINE ACE_RMCast_Proxy::ACE_RMCast_Proxy (void) - : highest_in_sequence_ (0) + : next_expected_ (0) , highest_received_ (0) { } diff --git a/protocols/ace/RMCast/RMCast_Retransmission.cpp b/protocols/ace/RMCast/RMCast_Retransmission.cpp index a996e1204d5..7e38cdf7c97 100644 --- a/protocols/ace/RMCast/RMCast_Retransmission.cpp +++ b/protocols/ace/RMCast/RMCast_Retransmission.cpp @@ -16,18 +16,60 @@ ACE_RMCast_Retransmission::~ACE_RMCast_Retransmission (void) { } +class ACE_RMCast_Resend_Worker + : public ACE_RMCast_Worker<ACE_UINT32,ACE_RMCast::Data> +{ +public: + ACE_RMCast_Resend_Worker (ACE_RMCast_Module *next, + ACE_UINT32 max_sequence_number) + : n (0) + , next_ (next) + , max_sequence_number_ (max_sequence_number) + { + } + + int work (ACE_UINT32 const & key, + ACE_RMCast::Data const & item) + { + if (key > this->max_sequence_number_) + return 0; + ACE_DEBUG ((LM_DEBUG, + " Retransmission::resend - message %d resent\n", + key)); + ACE_RMCast::Data data = item; + int r = this->next_->data (data); + if (r != 0) + return r; + n++; + return 0; + } + + int n; + +private: + ACE_RMCast_Module *next_; + + ACE_UINT32 max_sequence_number_; +}; + int -ACE_RMCast_Retransmission::close (void) +ACE_RMCast_Retransmission::resend (ACE_UINT32 max_sequence_number) { - Messages_Iterator end = this->messages_.end (); + if (this->next () == 0) + return 0; - for (Messages_Iterator i = this->messages_.begin (); - i != end; - ++i) - { - ACE_Message_Block::release ((*i).item ().payload); - } - this->messages_.close (); + ACE_RMCast_Resend_Worker worker (this->next (), max_sequence_number); + + if (this->messages_.for_each (&worker) == -1) + return -1; + + return worker.n; +} + +int +ACE_RMCast_Retransmission::close (void) +{ + // @@ return 0; } @@ -40,7 +82,6 @@ ACE_RMCast_Retransmission::data (ACE_RMCast::Data &data) int r = this->next ()->data (data); if (r == 0) { - ACE_GUARD_RETURN (ACE_SYNCH_MUTEX, ace_mon, this->mutex_, -1); ACE_RMCast::Data copy = data; copy.payload = ACE_Message_Block::duplicate (data.payload); r = this->messages_.bind (data.sequence_number, copy); @@ -55,8 +96,9 @@ ACE_RMCast_Retransmission::join (ACE_RMCast::Join &join) return 0; ACE_RMCast::Ack_Join ack_join; +#if 0 + // @@ { - ACE_GUARD_RETURN (ACE_SYNCH_MUTEX, ace_mon, this->mutex_, -1); Messages_Iterator end = this->messages_.end (); Messages_Iterator begin = this->messages_.begin (); @@ -70,6 +112,7 @@ ACE_RMCast_Retransmission::join (ACE_RMCast::Join &join) ack_join.next_sequence_number = (*begin).key (); } } +#endif (void) join.source->reply_ack_join (ack_join); // @@ We should force a full retransmission of all the messages! @@ -77,20 +120,54 @@ ACE_RMCast_Retransmission::join (ACE_RMCast::Join &join) return 0; } +class ACE_RMCast_Ack_Worker + : public ACE_RMCast_Worker<ACE_UINT32,ACE_RMCast::Data> +{ +public: + ACE_RMCast_Ack_Worker (ACE_RMCast::Ack &ack, + ACE_RMCast_Retransmission::Messages::Write_Guard &g, + ACE_RMCast_Retransmission::Messages *messages) + : ack_ (ack) + , ace_mon_ (g) + , messages_ (messages) + { + } + + int work (ACE_UINT32 const & key, + ACE_RMCast::Data const &) + { + if (key >= this->ack_.next_expected) + return 0; + ACE_DEBUG ((LM_DEBUG, + " Retransmission::ack - message %d erased\n", + key)); + return this->messages_->unbind_i (this->ace_mon_, key); + } + +private: + ACE_RMCast_Ack_Worker (const ACE_RMCast_Ack_Worker&); + ACE_RMCast_Ack_Worker& operator= (const ACE_RMCast_Ack_Worker&); + +private: + ACE_RMCast::Ack &ack_; + + ACE_RMCast_Retransmission::Messages::Write_Guard &ace_mon_; + + ACE_RMCast_Retransmission::Messages *messages_; +}; + int ACE_RMCast_Retransmission::ack (ACE_RMCast::Ack &ack) { - ACE_GUARD_RETURN (ACE_SYNCH_MUTEX, ace_mon, this->mutex_, -1); - for (Messages_Iterator i = this->messages_.begin (); - i != this->messages_.end (); - /* do nothing */) - { - ACE_UINT32 key = (*i).key (); - if (key > ack.highest_in_sequence) - break; - this->messages_.unbind (key); - } - return 0; + Messages::Write_Guard ace_mon (this->messages_.mutex_, + this->messages_.cond_, + this->messages_.pending_writes_, + this->messages_.writing_, + this->messages_.collection_); + + ACE_RMCast_Ack_Worker worker (ack, ace_mon, &this->messages_); + + return this->messages_.for_each (&worker); } #if defined (ACE_HAS_EXPLICIT_TEMPLATE_INSTANTIATION) @@ -101,4 +178,10 @@ template class ACE_RB_Tree_Iterator<ACE_UINT32,ACE_RMCast::Data,ACE_Less_Than<AC template class ACE_RB_Tree_Reverse_Iterator<ACE_UINT32,ACE_RMCast::Data,ACE_Less_Than<ACE_UINT32>,ACE_Null_Mutex>; template class ACE_RB_Tree_Node<ACE_UINT32,ACE_RMCast::Data>; +template class ACE_RMCast_Copy_On_Write<ACE_UINT32,ACE_RMCast::Data,ACE_RMCast_Retransmission::Collection,ACE_RMCast_Retransmission::Collection_Iterator>; +template class ACE_RMCast_Copy_On_Write_Write_Guard<ACE_RMCast_Retransmission::Collection,ACE_RMCast_Retransmission::Collection_Iterator>; +template class ACE_RMCast_Copy_On_Write_Read_Guard<ACE_RMCast_Retransmission::Collection,ACE_RMCast_Retransmission::Collection_Iterator>; +template class ACE_RMCast_Copy_On_Write_Collection<ACE_RMCast_Retransmission::Collection,ACE_RMCast_Retransmission::Collection_Iterator>; +template class ACE_RMCast_Worker<ACE_UINT32,ACE_RMCast::Data>; + #endif /* ACE_HAS_EXPLICIT_TEMPLATE_INSTANTIATION */ diff --git a/protocols/ace/RMCast/RMCast_Retransmission.h b/protocols/ace/RMCast/RMCast_Retransmission.h index 7c586fe5dd6..b7bc20d2914 100644 --- a/protocols/ace/RMCast/RMCast_Retransmission.h +++ b/protocols/ace/RMCast/RMCast_Retransmission.h @@ -19,6 +19,7 @@ #include "ace/pre.h" #include "RMCast_Module.h" +#include "RMCast_Copy_On_Write.h" #include "ace/RB_Tree.h" #include "ace/Synch.h" @@ -26,38 +27,72 @@ # pragma once #endif /* ACE_LACKS_PRAGMA_ONCE */ +//! Store messages for retransmission in reliable configurations +/*! + * Reliable configurations of the RMCast framework need to store + * messages on the sender side to resend them if one or more clients + * do not receive them successfully. + */ class ACE_RMCast_Export ACE_RMCast_Retransmission : public ACE_RMCast_Module { - // = TITLE - // Reliable Multicast Retransmission - // - // = DESCRIPTION - // Define the interface for all reliable multicast retransmission public: // = Initialization and termination methods. + //! Constructor ACE_RMCast_Retransmission (void); - // Constructor + //! Destructor virtual ~ACE_RMCast_Retransmission (void); - // Destructor - // = The RMCast_Module methods + //! Use a Red-Black Tree to keep the queue of messages + typedef ACE_RB_Tree<ACE_UINT32,ACE_RMCast::Data,ACE_Less_Than<ACE_UINT32>,ACE_Null_Mutex> Collection; + typedef ACE_RB_Tree_Iterator<ACE_UINT32,ACE_RMCast::Data,ACE_Less_Than<ACE_UINT32>,ACE_Null_Mutex> Collection_Iterator; + + //! The messages are stored in the Copy_On_Write wrapper to provide + //! an efficient, but thread safe interface. + typedef ACE_RMCast_Copy_On_Write<ACE_UINT32,ACE_RMCast::Data,Collection,Collection_Iterator> Messages; + + //! Resend messages + /*! + * Resends all the messages up to \param max_sequence_number + * Returns the number of messages sent, or -1 if there where any + * errors. + */ + int resend (ACE_UINT32 max_sequence_number); + + //! Cleanup all the stored messages virtual int close (void); + + //! Pass the message downstream, but also save it in the + //! retransmission queue + /*! + * Sequence number are assigned by the ACE_RMCast_Fragmentation + * class, consequently this class first passes the message + * downstream, to obtain the sequence number and then stores the + * message for later retransmission. + */ virtual int data (ACE_RMCast::Data &data); + + //! Process an Ack message from the remote receivers. + /*! + * Normally this Ack message will be a summary of all the Ack + * messages received by the ACE_RMCast_Membership class + */ virtual int ack (ACE_RMCast::Ack &); + + //! Detect when new members join the group and Ack_Join them + /*! + * When a new receiver joins the group this module sends an Ack_Join + * message with the next sequence number that the receiver should + * expect. + * The sequence number is obtained from the current list of cached + * messages. + */ virtual int join (ACE_RMCast::Join &); protected: - typedef ACE_RB_Tree<ACE_UINT32,ACE_RMCast::Data,ACE_Less_Than<ACE_UINT32>,ACE_Null_Mutex> - Messages; - typedef ACE_RB_Tree_Iterator<ACE_UINT32,ACE_RMCast::Data,ACE_Less_Than<ACE_UINT32>,ACE_Null_Mutex> - Messages_Iterator; + //! The retransmission buffer Messages messages_; - // The retransmission buffer - - ACE_SYNCH_MUTEX mutex_; - // Synchronization }; #if defined (__ACE_INLINE__) diff --git a/protocols/ace/RMCast/RMCast_UDP_Event_Handler.h b/protocols/ace/RMCast/RMCast_UDP_Event_Handler.h index 02798cee7f8..2a6e7c45d42 100644 --- a/protocols/ace/RMCast/RMCast_UDP_Event_Handler.h +++ b/protocols/ace/RMCast/RMCast_UDP_Event_Handler.h @@ -1,16 +1,5 @@ // $Id$ -// ============================================================================ -// -// = DESCRIPTION -// Implement an adapter between the ACE Reactor and the -// ACE_RMCast_IO_UDP -// -// = AUTHOR -// Carlos O'Ryan <coryan@uci.edu> -// -// ============================================================================ - #ifndef ACE_RMCAST_UDP_EVENT_HANDLER_H #define ACE_RMCAST_UDP_EVENT_HANDLER_H #include "ace/pre.h" @@ -25,24 +14,41 @@ class ACE_RMCast_IO_UDP; class ACE_INET_Addr; +//! Implement an Adapter for the ACE_RMCast_IO_UDP class +/*! + * Applications may wish to use the ACE_Reactor to demultiplex I/O + * events for an ACE_RMCast_IO_UDP object. However other application + * may choose to make ACE_RMCast_IO_UDP active, or they may dedicate + * their own threads for its events. + * To avoid couplin ACE_RMCast_IO_UDP with the Reactor we don't make + * it derived from ACE_Event_Handler or any other class in the Reactor + * framework, instead, this simple Adapter can forward the Reactor + * messages to an ACE_RMCast_IO_UDP object. + */ class ACE_RMCast_Export ACE_RMCast_UDP_Event_Handler : public ACE_Event_Handler { public: - ACE_RMCast_UDP_Event_Handler (ACE_RMCast_IO_UDP *receiver); - // Constructor - + //! Constructor, save io_udp as the Adaptee in the Adapter pattern. + ACE_RMCast_UDP_Event_Handler (ACE_RMCast_IO_UDP *io_udp); + + //! Destructor + /*! + * Notice that this class does not own the ACE_RMCast_IO_UDP + * adaptee, so it does not destroy it. + */ ~ACE_RMCast_UDP_Event_Handler (void); - // Destructor - // = The Event_Handler methods + //@{ + //! Documented in ACE_Event_Handler class virtual ACE_HANDLE get_handle (void) const; virtual int handle_input (ACE_HANDLE fd = ACE_INVALID_HANDLE); virtual int handle_timeout (const ACE_Time_Value ¤t_time, const void *act = 0); + //@} private: + //! The adaptee ACE_RMCast_IO_UDP *io_udp_; - // The sender }; #if defined (__ACE_INLINE__) diff --git a/protocols/ace/RMCast/RMCast_UDP_Proxy.cpp b/protocols/ace/RMCast/RMCast_UDP_Proxy.cpp index 010267b0cbb..2eb0983b171 100644 --- a/protocols/ace/RMCast/RMCast_UDP_Proxy.cpp +++ b/protocols/ace/RMCast/RMCast_UDP_Proxy.cpp @@ -123,7 +123,7 @@ ACE_RMCast_UDP_Proxy::receive_message (char *buffer, size_t size) ACE_OS::memcpy (&tmp, buffer + 1, sizeof(tmp)); - ack.highest_in_sequence = ACE_NTOHL (tmp); + ack.next_expected = ACE_NTOHL (tmp); ACE_OS::memcpy (&tmp, buffer + 1 + sizeof(ACE_UINT32), sizeof(tmp)); ack.highest_received = ACE_NTOHL (tmp); @@ -175,4 +175,3 @@ ACE_RMCast_UDP_Proxy::reply_leave (ACE_RMCast::Leave &leave) { return this->io_udp_->send_leave (leave, this->peer_addr_); } - diff --git a/protocols/ace/RMCast/RMCast_Worker.cpp b/protocols/ace/RMCast/RMCast_Worker.cpp new file mode 100644 index 00000000000..06254b8c0f6 --- /dev/null +++ b/protocols/ace/RMCast/RMCast_Worker.cpp @@ -0,0 +1,19 @@ +// $Id$ + +#ifndef ACE_RMCAST_WORKER_CPP +#define ACE_RMCAST_WORKER_CPP + +#include "RMCast_Worker.h" + +#if ! defined (__ACE_INLINE__) +#include "RMCast_Worker.i" +#endif /* __ACE_INLINE__ */ + +ACE_RCSID(RMCast, RMCast_Worker, "$Id$") + +template<class KEY, class ITEM> +ACE_RMCast_Worker<KEY,ITEM>::~ACE_RMCast_Worker (void) +{ +} + +#endif /* ACE_RMCAST_WORKER_CPP */ diff --git a/protocols/ace/RMCast/RMCast_Worker.h b/protocols/ace/RMCast/RMCast_Worker.h new file mode 100644 index 00000000000..d3eb3032ebc --- /dev/null +++ b/protocols/ace/RMCast/RMCast_Worker.h @@ -0,0 +1,36 @@ +/* -*- C++ -*- */ +// $Id$ +// + +#ifndef ACE_RMCAST_WORKER_H +#define ACE_RMCAST_WORKER_H + +#include "ace/config-all.h" + +#if !defined (ACE_LACKS_PRAGMA_ONCE) +# pragma once +#endif /* ACE_LACKS_PRAGMA_ONCE */ + +template<class KEY, class ITEM> +class ACE_RMCast_Worker +{ +public: + virtual ~ACE_RMCast_Worker (void); + + virtual int work (KEY const & key, + ITEM const & item) = 0; +}; + +#if defined (__ACE_INLINE__) +#include "RMCast_Worker.i" +#endif /* __ACE_INLINE__ */ + +#if defined (ACE_TEMPLATES_REQUIRE_SOURCE) +#include "RMCast_Worker.cpp" +#endif /* ACE_TEMPLATES_REQUIRE_SOURCE */ + +#if defined (ACE_TEMPLATES_REQUIRE_PRAGMA) +#pragma implementation ("RMCast_Worker.cpp") +#endif /* ACE_TEMPLATES_REQUIRE_PRAGMA */ + +#endif /* ACE_RMCAST_WORKER_H */ diff --git a/protocols/ace/RMCast/RMCast_Worker.i b/protocols/ace/RMCast/RMCast_Worker.i new file mode 100644 index 00000000000..cfa1da318d3 --- /dev/null +++ b/protocols/ace/RMCast/RMCast_Worker.i @@ -0,0 +1 @@ +// $Id$ diff --git a/tests/Makefile b/tests/Makefile index 3a630c89bdf..0f1b0fc51d2 100644 --- a/tests/Makefile +++ b/tests/Makefile @@ -137,8 +137,12 @@ ifneq ($(ACE_HAS_GNUG_PRE_2_8),1) endif endif +ifeq ($(rmcast),1) + DIRS += RMCast +endif + include $(ACE_ROOT)/include/makeinclude/rules.common.GNU -include $(ACE_ROOT)/include/makeinclude/rules.nonested.GNU +include $(ACE_ROOT)/include/makeinclude/rules.nested.GNU include $(ACE_ROOT)/include/makeinclude/rules.bin.GNU ifdef purify diff --git a/tests/RMCast/Makefile b/tests/RMCast/Makefile index c1949350a25..0741c7df36f 100644 --- a/tests/RMCast/Makefile +++ b/tests/RMCast/Makefile @@ -11,7 +11,8 @@ BIN = RMCast_Fragment_Test \ RMCast_Reassembly_Test \ RMCast_UDP_Best_Effort_Test \ - RMCast_Membership_Test + RMCast_Membership_Test \ + RMCast_Retransmission_Test PSRC=$(addsuffix .cpp,$(BIN)) LDLIBS = -lACE_RMCast @@ -674,4 +675,167 @@ endif $(ACE_ROOT)/ace/Stream_Modules.h \ $(ACE_ROOT)/ace/Stream_Modules.cpp +.obj/RMCast_Retransmission_Test.o .obj/RMCast_Retransmission_Test.so .shobj/RMCast_Retransmission_Test.o .shobj/RMCast_Retransmission_Test.so: RMCast_Retransmission_Test.cpp \ + ../test_config.h \ + $(ACE_ROOT)/ace/pre.h \ + $(ACE_ROOT)/ace/post.h \ + $(ACE_ROOT)/ace/ACE_export.h \ + $(ACE_ROOT)/ace/svc_export.h \ + $(ACE_ROOT)/ace/ace_wchar.h \ + $(ACE_ROOT)/ace/OS.h \ + $(ACE_ROOT)/ace/OS_Dirent.h \ + $(ACE_ROOT)/ace/OS_Export.h \ + $(ACE_ROOT)/ace/OS_Dirent.inl \ + $(ACE_ROOT)/ace/OS_String.h \ + $(ACE_ROOT)/ace/OS_String.inl \ + $(ACE_ROOT)/ace/OS_Memory.h \ + $(ACE_ROOT)/ace/OS_Memory.inl \ + $(ACE_ROOT)/ace/OS_TLI.h \ + $(ACE_ROOT)/ace/OS_TLI.inl \ + $(ACE_ROOT)/ace/Min_Max.h \ + $(ACE_ROOT)/ace/streams.h \ + $(ACE_ROOT)/ace/Basic_Types.h \ + $(ACE_ROOT)/ace/Basic_Types.i \ + $(ACE_ROOT)/ace/Trace.h \ + $(ACE_ROOT)/ace/OS.i \ + $(ACE_ROOT)/ace/Singleton.h \ + $(ACE_ROOT)/ace/Synch.h \ + $(ACE_ROOT)/ace/ACE.h \ + $(ACE_ROOT)/ace/ACE.i \ + $(ACE_ROOT)/ace/Synch.i \ + $(ACE_ROOT)/ace/Synch_T.h \ + $(ACE_ROOT)/ace/Event_Handler.h \ + $(ACE_ROOT)/ace/Event_Handler.i \ + $(ACE_ROOT)/ace/Synch_T.i \ + $(ACE_ROOT)/ace/Thread.h \ + $(ACE_ROOT)/ace/Thread_Adapter.h \ + $(ACE_ROOT)/ace/Base_Thread_Adapter.h \ + $(ACE_ROOT)/ace/Base_Thread_Adapter.inl \ + $(ACE_ROOT)/ace/Thread_Adapter.inl \ + $(ACE_ROOT)/ace/Thread.i \ + $(ACE_ROOT)/ace/Atomic_Op.i \ + $(ACE_ROOT)/ace/Synch_T.cpp \ + $(ACE_ROOT)/ace/Log_Msg.h \ + $(ACE_ROOT)/ace/Log_Record.h \ + $(ACE_ROOT)/ace/Log_Priority.h \ + $(ACE_ROOT)/ace/Log_Record.i \ + $(ACE_ROOT)/ace/Singleton.i \ + $(ACE_ROOT)/ace/Singleton.cpp \ + $(ACE_ROOT)/ace/Object_Manager.h \ + $(ACE_ROOT)/ace/Object_Manager.i \ + $(ACE_ROOT)/ace/Managed_Object.h \ + $(ACE_ROOT)/ace/Managed_Object.i \ + $(ACE_ROOT)/ace/Managed_Object.cpp \ + $(ACE_ROOT)/ace/RMCast/RMCast_Proxy.h \ + $(ACE_ROOT)/ace/RMCast/RMCast_Module.h \ + $(ACE_ROOT)/ace/RMCast/RMCast.h \ + $(ACE_ROOT)/ace/RMCast/RMCast_Export.h \ + $(ACE_ROOT)/ace/RMCast/RMCast.i \ + $(ACE_ROOT)/ace/RMCast/RMCast_Module.i \ + $(ACE_ROOT)/ace/RMCast/RMCast_Proxy.i \ + $(ACE_ROOT)/ace/RMCast/RMCast_Retransmission.h \ + $(ACE_ROOT)/ace/RMCast/RMCast_Copy_On_Write.h \ + $(ACE_ROOT)/ace/RMCast/RMCast_Worker.h \ + $(ACE_ROOT)/ace/RMCast/RMCast_Worker.i \ + $(ACE_ROOT)/ace/RMCast/RMCast_Worker.cpp \ + $(ACE_ROOT)/ace/RMCast/RMCast_Copy_On_Write.i \ + $(ACE_ROOT)/ace/RMCast/RMCast_Copy_On_Write.cpp \ + $(ACE_ROOT)/ace/RB_Tree.h \ + $(ACE_ROOT)/ace/Functor.h \ + $(ACE_ROOT)/ace/Functor.i \ + $(ACE_ROOT)/ace/Functor_T.h \ + $(ACE_ROOT)/ace/Functor_T.i \ + $(ACE_ROOT)/ace/Functor_T.cpp \ + $(ACE_ROOT)/ace/RB_Tree.i \ + $(ACE_ROOT)/ace/Malloc.h \ + $(ACE_ROOT)/ace/Malloc_Base.h \ + $(ACE_ROOT)/ace/Based_Pointer_T.h \ + $(ACE_ROOT)/ace/Based_Pointer_T.i \ + $(ACE_ROOT)/ace/Based_Pointer_T.cpp \ + $(ACE_ROOT)/ace/Based_Pointer_Repository.h \ + $(ACE_ROOT)/ace/Malloc.i \ + $(ACE_ROOT)/ace/Malloc_T.h \ + $(ACE_ROOT)/ace/Free_List.h \ + $(ACE_ROOT)/ace/Free_List.i \ + $(ACE_ROOT)/ace/Free_List.cpp \ + $(ACE_ROOT)/ace/Malloc_T.i \ + $(ACE_ROOT)/ace/Malloc_T.cpp \ + $(ACE_ROOT)/ace/Memory_Pool.h \ + $(ACE_ROOT)/ace/Signal.h \ + $(ACE_ROOT)/ace/Containers.h \ + $(ACE_ROOT)/ace/Containers.i \ + $(ACE_ROOT)/ace/Containers_T.h \ + $(ACE_ROOT)/ace/Containers_T.i \ + $(ACE_ROOT)/ace/Containers_T.cpp \ + $(ACE_ROOT)/ace/Signal.i \ + $(ACE_ROOT)/ace/Mem_Map.h \ + $(ACE_ROOT)/ace/Mem_Map.i \ + $(ACE_ROOT)/ace/SV_Semaphore_Complex.h \ + $(ACE_ROOT)/ace/SV_Semaphore_Simple.h \ + $(ACE_ROOT)/ace/SV_Semaphore_Simple.i \ + $(ACE_ROOT)/ace/SV_Semaphore_Complex.i \ + $(ACE_ROOT)/ace/Memory_Pool.i \ + $(ACE_ROOT)/ace/RB_Tree.cpp \ + $(ACE_ROOT)/ace/RMCast/RMCast_Retransmission.i \ + $(ACE_ROOT)/ace/Task.h \ + $(ACE_ROOT)/ace/Service_Object.h \ + $(ACE_ROOT)/ace/Shared_Object.h \ + $(ACE_ROOT)/ace/Shared_Object.i \ + $(ACE_ROOT)/ace/Service_Object.i \ + $(ACE_ROOT)/ace/Thread_Manager.h \ + $(ACE_ROOT)/ace/Thread_Manager.i \ + $(ACE_ROOT)/ace/Task.i \ + $(ACE_ROOT)/ace/Task_T.h \ + $(ACE_ROOT)/ace/Message_Queue.h \ + $(ACE_ROOT)/ace/Message_Block.h \ + $(ACE_ROOT)/ace/Message_Block.i \ + $(ACE_ROOT)/ace/Message_Block_T.h \ + $(ACE_ROOT)/ace/Message_Block_T.i \ + $(ACE_ROOT)/ace/Message_Block_T.cpp \ + $(ACE_ROOT)/ace/IO_Cntl_Msg.h \ + $(ACE_ROOT)/ace/Message_Queue_T.h \ + $(ACE_ROOT)/ace/Message_Queue_T.i \ + $(ACE_ROOT)/ace/Message_Queue_T.cpp \ + $(ACE_ROOT)/ace/Strategies.h \ + $(ACE_ROOT)/ace/Strategies_T.h \ + $(ACE_ROOT)/ace/Service_Config.h \ + $(ACE_ROOT)/ace/SString.h \ + $(ACE_ROOT)/ace/SString.i \ + $(ACE_ROOT)/ace/Service_Config.i \ + $(ACE_ROOT)/ace/Reactor.h \ + $(ACE_ROOT)/ace/Handle_Set.h \ + $(ACE_ROOT)/ace/Handle_Set.i \ + $(ACE_ROOT)/ace/Timer_Queue.h \ + $(ACE_ROOT)/ace/Timer_Queue_T.h \ + $(ACE_ROOT)/ace/Timer_Queue_T.i \ + $(ACE_ROOT)/ace/Timer_Queue_T.cpp \ + $(ACE_ROOT)/ace/Reactor.i \ + $(ACE_ROOT)/ace/Reactor_Impl.h \ + $(ACE_ROOT)/ace/Svc_Conf_Tokens.h \ + $(ACE_ROOT)/ace/Synch_Options.h \ + $(ACE_ROOT)/ace/Synch_Options.i \ + $(ACE_ROOT)/ace/Hash_Map_Manager.h \ + $(ACE_ROOT)/ace/Hash_Map_Manager_T.h \ + $(ACE_ROOT)/ace/Hash_Map_Manager_T.i \ + $(ACE_ROOT)/ace/Hash_Map_Manager_T.cpp \ + $(ACE_ROOT)/ace/Strategies_T.i \ + $(ACE_ROOT)/ace/Strategies_T.cpp \ + $(ACE_ROOT)/ace/Service_Repository.h \ + $(ACE_ROOT)/ace/Service_Types.h \ + $(ACE_ROOT)/ace/Service_Types.i \ + $(ACE_ROOT)/ace/Service_Repository.i \ + $(ACE_ROOT)/ace/WFMO_Reactor.h \ + $(ACE_ROOT)/ace/Process_Mutex.h \ + $(ACE_ROOT)/ace/Process_Mutex.inl \ + $(ACE_ROOT)/ace/WFMO_Reactor.i \ + $(ACE_ROOT)/ace/Strategies.i \ + $(ACE_ROOT)/ace/Message_Queue.i \ + $(ACE_ROOT)/ace/Task_T.i \ + $(ACE_ROOT)/ace/Task_T.cpp \ + $(ACE_ROOT)/ace/Module.h \ + $(ACE_ROOT)/ace/Module.i \ + $(ACE_ROOT)/ace/Module.cpp \ + $(ACE_ROOT)/ace/Stream_Modules.h \ + $(ACE_ROOT)/ace/Stream_Modules.cpp + # IF YOU PUT ANYTHING HERE IT WILL GO AWAY diff --git a/tests/RMCast/RMCast_Membership_Test.cpp b/tests/RMCast/RMCast_Membership_Test.cpp index f1e320b43ef..240382ba758 100644 --- a/tests/RMCast/RMCast_Membership_Test.cpp +++ b/tests/RMCast/RMCast_Membership_Test.cpp @@ -40,7 +40,7 @@ public: return this->joined_; } //! Set the flag to remember if this proxy has joined the group or - //! not. + //! not. void joined (int j) { this->joined_ = j; @@ -286,12 +286,12 @@ Tester::ack (ACE_RMCast::Ack &ack) // ACE_DEBUG ((LM_DEBUG, // "Received ack in Tester %d,%d\n", - // ack.highest_in_sequence, + // ack.next_expected, // ack.highest_received)); // Assume the lock is held, verify that the ack message satisfy the // invariants... - ACE_UINT32 highest_in_sequence; + ACE_UINT32 next_expected; ACE_UINT32 highest_received; int set = 0; for (size_t i = 0; i != nproxy; ++i) @@ -301,18 +301,18 @@ Tester::ack (ACE_RMCast::Ack &ack) if (!set) { set = 1; - highest_in_sequence = this->proxy_[i].highest_in_sequence (); + next_expected = this->proxy_[i].next_expected (); highest_received = this->proxy_[i].highest_received (); } else { - if (highest_in_sequence > - this->proxy_[i].highest_in_sequence ()) + if (next_expected > + this->proxy_[i].next_expected ()) { - highest_in_sequence = - this->proxy_[i].highest_in_sequence (); + next_expected = + this->proxy_[i].next_expected (); } - if (highest_received < + if (highest_received < this->proxy_[i].highest_received ()) { highest_received = @@ -325,10 +325,10 @@ Tester::ack (ACE_RMCast::Ack &ack) return 0; // Check the invariants - if (ack.highest_in_sequence != highest_in_sequence) + if (ack.next_expected != next_expected) { ACE_ERROR_RETURN ((LM_ERROR, - "Invalid highest_in_sequence in Ack\n"), + "Invalid next_expected in Ack\n"), -1); } if (ack.highest_received != highest_received) @@ -395,8 +395,8 @@ Tester::generate_acks (int iterations) ACE_RMCast::Ack ack; ack.source = &this->proxy_[i]; - ack.highest_in_sequence = - this->proxy_[i].highest_in_sequence (); + ack.next_expected = + this->proxy_[i].next_expected (); ack.highest_received = this->proxy_[i].highest_received (); @@ -415,13 +415,13 @@ Tester::generate_acks (int iterations) ack.highest_received++; break; default: - if (ack.highest_received > ack.highest_in_sequence) - ack.highest_in_sequence++; + if (ack.highest_received > ack.next_expected) + ack.next_expected++; break; } // ACE_DEBUG ((LM_DEBUG, // "Sending ack message (%d,%d) through proxy %d\n", - // ack.highest_in_sequence, + // ack.next_expected, // ack.highest_received, // i)); int result = this->proxy_[i].ack (ack); diff --git a/tests/RMCast/RMCast_Retransmission_Test.cpp b/tests/RMCast/RMCast_Retransmission_Test.cpp new file mode 100644 index 00000000000..a40c0fd4177 --- /dev/null +++ b/tests/RMCast/RMCast_Retransmission_Test.cpp @@ -0,0 +1,457 @@ +// $Id$ + +#include "test_config.h" +#include "ace/RMCast/RMCast_Proxy.h" +#include "ace/RMCast/RMCast_Retransmission.h" + +#include "ace/Task.h" + +ACE_RCSID(tests, RMCast_Retransmission_Test, "$Id$") + +// **************************************************************** + +class Tester; + +//! Simple proxy for the ACE_RMCast_Retransmission test harness +/*! + * Implement a simple version of the ACE_RMCast_Proxy class used in + * the ACE_RMCast_Retransmission test harness. + */ +class Test_Proxy : public ACE_RMCast_Proxy +{ +public: + Test_Proxy (void); + + void set_tester (Tester *tester) + { + this->tester_ = tester; + } + + //! Get the flag to remember if this proxy has joined the group or + //! not. + int joined (void) const + { + return this->joined_; + } + + //! Set the flag to remember if this proxy has joined the group or + //! not. + void joined (int j) + { + this->joined_ = j; + } + + //! Most of the reply_ methods just return 0, there is no real remote + //! peer, this is just a test harness + //@{ + virtual int reply_data (ACE_RMCast::Data &) + { + return 0; + } + virtual int reply_poll (ACE_RMCast::Poll &) + { + return 0; + } + //! Must check that that sequence number is reasonable + virtual int reply_ack_join (ACE_RMCast::Ack_Join &); + virtual int reply_ack_leave (ACE_RMCast::Ack_Leave &) + { + return 0; + } + virtual int reply_ack (ACE_RMCast::Ack &) + { + return 0; + } + virtual int reply_join (ACE_RMCast::Join &) + { + return 0; + } + virtual int reply_leave (ACE_RMCast::Leave &) + { + return 0; + } + //@} + + int data (ACE_RMCast::Data &data); + int ack (ACE_RMCast::Ack &ack); + +private: + //! Remember if we joined the group already. + int joined_; + + //! Keep a reference to the main testing class so it can be called + //! back. + Tester *tester_; + + //! The test is randomized to get better coverage. This is the seed + //! variable for the test + ACE_RANDR_TYPE seed_; + + //! Synchronize internal data structures. + ACE_SYNCH_MUTEX lock_; +}; + +// **************************************************************** + +//! The number of proxies used in the test +/*! + * Not all member will be present in the group at the same time. But + * this variable controls the maximum number + */ +const size_t nproxy = 16; +//! A simple module to receive the messages from ACE_RMCast_Retransmission +/*! + * The ACE_RMCast_Retransmission layer pushes messages to its next module + * when all the members have acked a message, when a new member joins, + * when a member leaves, etc. + * This class will verify that the messages are exactly what we + * expect. + */ +class Tester : public ACE_RMCast_Module +{ +public: + Tester (void); + + //! Run the test for \iterations times + void run (int iterations); + + //! One of the proxies has received an Ack_Join message, we need to + //! validate it + int reply_ack_join (Test_Proxy *proxy, ACE_RMCast::Ack_Join &ack_join); + + //! One of the proxies has received an Ack message. + /*! + * In this method we simulate the role of the Membership layer, + * however we just do brute force instead of trying to optimize the + * Ack processing + */ + int ack (Test_Proxy *proxy, ACE_RMCast::Ack &ack); + + virtual int data (ACE_RMCast::Data &data); + +private: + //! Generate a new message to drive the test + void generate_one_message (void); + + //! Send a single Ack that summarizes the state of all the proxies. + int send_ack (void); + + //! Ask the retransmission layer to resend lost messages + int resend (void); + +private: + //! The array of proxies + Test_Proxy proxy_[nproxy]; + + //! The Retransmission layer + ACE_RMCast_Retransmission retransmission_; + + //! Synchronize internal data structures + ACE_SYNCH_MUTEX lock_; + + //! The test is randomized to get better coverage. This is the seed + //! variable for the test + ACE_RANDR_TYPE seed_; + + //! Generate sequence numbers for the Retransmission layer + ACE_UINT32 sequence_number_generator_; +}; + +// **************************************************************** + +//! An Adapter to run Tester::run the test is a separate thread +class Task : public ACE_Task_Base +{ +public: + Task (Tester *tester); + + // = Read the documentation in "ace/Task.h" + int svc (void); + +private: + //! The tester object. + Tester *tester_; +}; + +// **************************************************************** + +int +main (int, ACE_TCHAR *[]) +{ + ACE_START_TEST (ACE_TEXT ("RMCast_Retransmission_Test")); + + ACE_DEBUG ((LM_DEBUG, + ACE_TEXT ("This is ACE Version %u.%u.%u\n\n"), + ACE::major_version(), + ACE::minor_version(), + ACE::beta_version())); + + { + ACE_DEBUG ((LM_DEBUG, "Running single threaded test\n")); + //! Run the test in single threaded mode + Tester tester; + tester.run (100); + } + { + ACE_DEBUG ((LM_DEBUG, "Running multi threaded test\n")); + //! Run the test in multi-threaded mode + Tester tester; + Task task (&tester); + if (task.activate (THR_NEW_LWP|THR_JOINABLE, 4) == -1) + ACE_ERROR_RETURN ((LM_ERROR, "Cannot activate the threads\n"), 1); + ACE_Thread_Manager::instance ()->wait (); + } + + ACE_END_TEST; + return 0; +} + +// **************************************************************** + +Tester::Tester (void) + : seed_ (ACE_OS::gethrtime ()) + , sequence_number_generator_ (0) +{ + // Initialize the stack... + this->retransmission_.next (this); + + for (size_t i = 0; i != nproxy; ++i) + { + this->proxy_[i].set_tester (this); + this->proxy_[i].joined (1); + } +} + +void +Tester::run (int iterations) +{ + for (int i = 0; i < iterations; ++i) + { + // Push data + this->generate_one_message (); + + this->send_ack (); + + // Re-send unacked data + if (i % 5 == 0) + { + this->resend (); + this->send_ack (); + } + } + + while (this->resend () != 0) + { + this->send_ack (); + } +} + +int +Tester::ack (Test_Proxy *, ACE_RMCast::Ack &) +{ + return this->send_ack (); +} + +int +Tester::send_ack () +{ + ACE_RMCast::Ack ack; + + int set = 0; + for (size_t i = 0; i != nproxy; ++i) + { + if (this->proxy_[i].joined () == 0) + continue; + if (!set) + { + ack.next_expected = this->proxy_[i].next_expected (); + ack.highest_received = this->proxy_[i].highest_received (); + set = 1; + } + else + { + if (ack.next_expected > this->proxy_[i].next_expected ()) + ack.next_expected = this->proxy_[i].next_expected (); + if (ack.highest_received < this->proxy_[i].highest_received ()) + ack.highest_received = this->proxy_[i].highest_received (); + } + } + if (!set) + return 0; + ACE_DEBUG ((LM_DEBUG, "Tested::ack - (%d,%d)\n", + ack.next_expected, ack.highest_received)); + return this->retransmission_.ack (ack); +} + +void +Tester::generate_one_message (void) +{ + ACE_Message_Block payload (1024); + payload.wr_ptr (1024); + + ACE_RMCast::Data data; + data.payload = &payload; + data.sequence_number = -1; + + int result = this->retransmission_.data (data); + if (result != 0) + { + ACE_ERROR ((LM_ERROR, + "Retransmission::data returned %d\n", + result)); + } +} + +int +Tester::resend (void) +{ + ACE_UINT32 max_sequence_number = + this->sequence_number_generator_; + int r = this->retransmission_.resend (max_sequence_number); + if (r == -1) + { + ACE_DEBUG ((LM_DEBUG, + "Error returned from Retransmission::resend\n")); + } + return r; +} + +int +Tester::data (ACE_RMCast::Data &data) +{ + // After going through the Retransmission layer we got some data, + // simulate the work of the following layers: + // - Fragmentation: assing message sequence numbers + // - IO_XXX: send to all known members + // - Reassembly: reconstruct the message on the receiving side. + + if (data.sequence_number == -1) + { + data.total_size = 1024; + data.fragment_offset = 0; + ACE_GUARD_RETURN (ACE_SYNCH_MUTEX, ace_mon, this->lock_, -1); + data.sequence_number = this->sequence_number_generator_++; + } + for (size_t i = 0; i != nproxy; ++i) + { + int result = this->proxy_[i].data (data); + if (result != 0) + { + ACE_ERROR ((LM_ERROR, + "Proxy::data returned %d for proxy %d\n", + result, i)); + return -1; + } + } + return 0; +} + +// **************************************************************** + +Task::Task (Tester *tester) + : tester_ (tester) +{ +} + +int +Task::svc (void) +{ + this->tester_->run (100); + return 0; +} + +// **************************************************************** + +Test_Proxy::Test_Proxy (void) + : joined_ (0) + , tester_ (0) + , seed_ (ACE_static_cast(ACE_RANDR_TYPE,ACE_OS::gethrtime ())) +{ +} + +int +Test_Proxy::data (ACE_RMCast::Data &data) +{ + ACE_DEBUG ((LM_DEBUG, " (%t) Proxy receives message %d\n", + data.sequence_number)); + ACE_GUARD_RETURN (ACE_SYNCH_MUTEX, ace_mon, this->lock_, -1); + + int c = ACE_OS::rand_r (this->seed_) % 100; + + const int success_ratio = 95; + + if (this->next_expected () > data.sequence_number) + { + // An old message, resend the ack... + ACE_RMCast::Ack ack; + ack.next_expected = this->next_expected (); + ack.highest_received = this->highest_received (); + ACE_DEBUG ((LM_DEBUG, "....it is an already accepted message\n")); + // Ack the message.... + return this->ack (ack); + } + + if (this->next_expected () == data.sequence_number) + { + // The message is the next one that we expected... + // Two choices: accept the message as successfully received or + // reject it, we accept them most of the time. + if (c > success_ratio) + { + // We ignore the message completely as if it was lost in the + // network + ACE_DEBUG ((LM_DEBUG, "....and drops it\n")); + return 0; + } + + ACE_RMCast::Ack ack; + ack.next_expected = data.sequence_number + 1; + if (ack.next_expected < this->highest_received ()) + { + ack.highest_received = this->highest_received (); + } + else + { + ack.highest_received = data.sequence_number; + } + ACE_DEBUG ((LM_DEBUG, "....and accepts it\n")); + // Ack the message.... + return this->ack (ack); + } + + ACE_DEBUG ((LM_DEBUG, "....the message is out of order\n")); + // This is an out of sequence number, maybe it is lost... + if (c > success_ratio) + { + ACE_DEBUG ((LM_DEBUG, "........and is dropped\n")); + return 0; + } + + ACE_DEBUG ((LM_DEBUG, "........and is accepted\n")); + ACE_RMCast::Ack ack; + ack.next_expected = this->next_expected (); + if (data.sequence_number < this->highest_received ()) + { + ack.highest_received = this->highest_received (); + } + else + { + ack.highest_received = data.sequence_number; + } + // Ack the message.... + return this->ack (ack); +} + +int +Test_Proxy::ack (ACE_RMCast::Ack &ack) +{ + (void) this->ACE_RMCast_Proxy::ack (ack); + (void) this->tester_->ack (this, ack); + return 0; +} + +int +Test_Proxy::reply_ack_join (ACE_RMCast::Ack_Join & /* ack_join */) +{ + return 0; // this->tester_->proxy_reply_ack_join (this, ack_join); +} diff --git a/tests/RMCast/RMCast_Retransmission_Test.dsp b/tests/RMCast/RMCast_Retransmission_Test.dsp new file mode 100644 index 00000000000..b60a4a9c677 --- /dev/null +++ b/tests/RMCast/RMCast_Retransmission_Test.dsp @@ -0,0 +1,96 @@ +# Microsoft Developer Studio Project File - Name="RMCast_Retransmission_Test" - Package Owner=<4>
+# Microsoft Developer Studio Generated Build File, Format Version 6.00
+# ** DO NOT EDIT **
+
+# TARGTYPE "Win32 (x86) Console Application" 0x0103
+
+CFG=RMCast_Retransmission_Test - Win32 Debug
+!MESSAGE This is not a valid makefile. To build this project using NMAKE,
+!MESSAGE use the Export Makefile command and run
+!MESSAGE
+!MESSAGE NMAKE /f "RMCast_Retransmission_Test.mak".
+!MESSAGE
+!MESSAGE You can specify a configuration when running NMAKE
+!MESSAGE by defining the macro CFG on the command line. For example:
+!MESSAGE
+!MESSAGE NMAKE /f "RMCast_Retransmission_Test.mak" CFG="RMCast_Retransmission_Test - Win32 Debug"
+!MESSAGE
+!MESSAGE Possible choices for configuration are:
+!MESSAGE
+!MESSAGE "RMCast_Retransmission_Test - Win32 Release" (based on "Win32 (x86) Console Application")
+!MESSAGE "RMCast_Retransmission_Test - Win32 Debug" (based on "Win32 (x86) Console Application")
+!MESSAGE
+
+# Begin Project
+# PROP AllowPerConfigDependencies 0
+# PROP Scc_ProjName ""
+# PROP Scc_LocalPath ""
+CPP=cl.exe
+RSC=rc.exe
+
+!IF "$(CFG)" == "RMCast_Retransmission_Test - Win32 Release"
+
+# PROP BASE Use_MFC 0
+# PROP BASE Use_Debug_Libraries 0
+# PROP BASE Output_Dir "Release"
+# PROP BASE Intermediate_Dir "Release"
+# PROP BASE Target_Dir ""
+# PROP Use_MFC 0
+# PROP Use_Debug_Libraries 0
+# PROP Output_Dir "Release"
+# PROP Intermediate_Dir "Release"
+# PROP Ignore_Export_Lib 0
+# PROP Target_Dir ""
+# ADD BASE CPP /nologo /W3 /GX /O2 /D "WIN32" /D "NDEBUG" /D "_CONSOLE" /D "_MBCS" /YX /FD /c
+# ADD CPP /nologo /MD /W3 /GX /O2 /I "..\.." /I ".." /D "WIN32" /D "NDEBUG" /D "_CONSOLE" /D "_MBCS" /FD /c
+# SUBTRACT CPP /YX
+# ADD BASE RSC /l 0x409 /d "NDEBUG"
+# ADD RSC /l 0x409 /d "NDEBUG"
+BSC32=bscmake.exe
+# ADD BASE BSC32 /nologo
+# ADD BSC32 /nologo
+LINK32=link.exe
+# ADD BASE LINK32 kernel32.lib user32.lib gdi32.lib winspool.lib comdlg32.lib advapi32.lib shell32.lib ole32.lib oleaut32.lib uuid.lib odbc32.lib odbccp32.lib kernel32.lib user32.lib gdi32.lib winspool.lib comdlg32.lib advapi32.lib shell32.lib ole32.lib oleaut32.lib uuid.lib odbc32.lib odbccp32.lib /nologo /subsystem:console /machine:I386
+# ADD LINK32 ace.lib ACE_RMCast.lib /nologo /subsystem:console /machine:I386 /libpath:"..\..\ace" /libpath:"..\..\ace\RMCast"
+
+!ELSEIF "$(CFG)" == "RMCast_Retransmission_Test - Win32 Debug"
+
+# PROP BASE Use_MFC 0
+# PROP BASE Use_Debug_Libraries 1
+# PROP BASE Output_Dir "Debug"
+# PROP BASE Intermediate_Dir "Debug"
+# PROP BASE Target_Dir ""
+# PROP Use_MFC 0
+# PROP Use_Debug_Libraries 1
+# PROP Output_Dir "Debug"
+# PROP Intermediate_Dir "Debug"
+# PROP Ignore_Export_Lib 0
+# PROP Target_Dir ""
+# ADD BASE CPP /nologo /W3 /Gm /GX /Zi /Od /D "WIN32" /D "_DEBUG" /D "_CONSOLE" /D "_MBCS" /YX /FD /c
+# ADD CPP /nologo /MDd /W3 /Gm /GX /Zi /Od /I "..\.." /I ".." /D "WIN32" /D "_DEBUG" /D "_CONSOLE" /D "_MBCS" /FD /c
+# SUBTRACT CPP /YX
+# ADD BASE RSC /l 0x409 /d "_DEBUG"
+# ADD RSC /l 0x409 /d "_DEBUG"
+BSC32=bscmake.exe
+# ADD BASE BSC32 /nologo
+# ADD BSC32 /nologo
+LINK32=link.exe
+# ADD BASE LINK32 kernel32.lib user32.lib gdi32.lib winspool.lib comdlg32.lib advapi32.lib shell32.lib ole32.lib oleaut32.lib uuid.lib odbc32.lib odbccp32.lib kernel32.lib user32.lib gdi32.lib winspool.lib comdlg32.lib advapi32.lib shell32.lib ole32.lib oleaut32.lib uuid.lib odbc32.lib odbccp32.lib /nologo /subsystem:console /debug /machine:I386 /pdbtype:sept
+# ADD LINK32 ACE_RMCastd.lib aced.lib /nologo /subsystem:console /debug /machine:I386 /out:"RMCast_Retransmission_Test.exe" /pdbtype:sept /libpath:"..\..\ace" /libpath:"..\..\ace\RMCast"
+
+!ENDIF
+
+# Begin Target
+
+# Name "RMCast_Retransmission_Test - Win32 Release"
+# Name "RMCast_Retransmission_Test - Win32 Debug"
+# Begin Group "Source Files"
+
+# PROP Default_Filter "cpp;c;cxx;rc;def;r;odl;idl;hpj;bat"
+# Begin Source File
+
+SOURCE=.\RMCast_Retransmission_Test.cpp
+# End Source File
+# End Group
+# End Target
+# End Project
diff --git a/tests/RMCast/RMCast_Tests.dsw b/tests/RMCast/RMCast_Tests.dsw index 5dc90ff201b..e9e6820ccc2 100644 --- a/tests/RMCast/RMCast_Tests.dsw +++ b/tests/RMCast/RMCast_Tests.dsw @@ -39,6 +39,18 @@ Package=<4> ###############################################################################
+Project: "RMCast_Retransmission_Test"=.\RMCast_Retransmission_Test.dsp - Package Owner=<4>
+
+Package=<5>
+{{{
+}}}
+
+Package=<4>
+{{{
+}}}
+
+###############################################################################
+
Project: "RMCast_UDP_Best_Effort_Test"=.\RMCast_UDP_Best_Effort_Test.dsp - Package Owner=<4>
Package=<5>
|