summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorcoryan <coryan@ae88bc3d-4319-0410-8dbf-d08b4c9d3795>2000-10-04 00:10:30 +0000
committercoryan <coryan@ae88bc3d-4319-0410-8dbf-d08b4c9d3795>2000-10-04 00:10:30 +0000
commitf3771b1e83267ab8fb41c1bcd1830e39d382ba03 (patch)
tree9c3c15f5b079379f81cb5f7ba56feb809f264c0f
parentd93c1a45bb44b5a9210b425c43a9f90684b2efed (diff)
downloadATCD-f3771b1e83267ab8fb41c1bcd1830e39d382ba03.tar.gz
ChangeLogTag:Tue Oct 3 17:07:37 2000 Carlos O'Ryan <coryan@uci.edu>
-rw-r--r--ChangeLog85
-rw-r--r--ChangeLogs/ChangeLog-02a85
-rw-r--r--ChangeLogs/ChangeLog-03a85
-rw-r--r--ace/RMCast/Makefile98
-rw-r--r--ace/RMCast/RMCast.h2
-rw-r--r--ace/RMCast/RMCast_Copy_On_Write.cpp56
-rw-r--r--ace/RMCast/RMCast_Copy_On_Write.h77
-rw-r--r--ace/RMCast/RMCast_Copy_On_Write.i1
-rw-r--r--ace/RMCast/RMCast_Reordering.cpp153
-rw-r--r--ace/RMCast/RMCast_Reordering.h94
-rw-r--r--ace/RMCast/RMCast_Reordering.i8
-rw-r--r--ace/RMCast/RMCast_Retransmission.cpp19
-rw-r--r--protocols/ace/RMCast/Makefile98
-rw-r--r--protocols/ace/RMCast/RMCast.h2
-rw-r--r--protocols/ace/RMCast/RMCast_Copy_On_Write.cpp56
-rw-r--r--protocols/ace/RMCast/RMCast_Copy_On_Write.h77
-rw-r--r--protocols/ace/RMCast/RMCast_Copy_On_Write.i1
-rw-r--r--protocols/ace/RMCast/RMCast_Reordering.cpp153
-rw-r--r--protocols/ace/RMCast/RMCast_Reordering.h94
-rw-r--r--protocols/ace/RMCast/RMCast_Reordering.i8
-rw-r--r--protocols/ace/RMCast/RMCast_Retransmission.cpp19
-rw-r--r--tests/RMCast/Makefile165
-rw-r--r--tests/RMCast/RMCast_Reordering_Test.cpp313
-rw-r--r--tests/RMCast/RMCast_Reordering_Test.dsp96
-rw-r--r--tests/RMCast/RMCast_Retransmission_Test.cpp26
-rw-r--r--tests/RMCast/RMCast_Tests.dsw12
26 files changed, 1650 insertions, 233 deletions
diff --git a/ChangeLog b/ChangeLog
index a4918748a7d..3d93f4b8e93 100644
--- a/ChangeLog
+++ b/ChangeLog
@@ -1,3 +1,30 @@
+Tue Oct 3 17:07:37 2000 Carlos O'Ryan <coryan@uci.edu>
+
+ * ace/RMCast/Makefile:
+ * ace/RMCast/RMCast_Reordering.h:
+ * ace/RMCast/RMCast_Reordering.i:
+ * ace/RMCast/RMCast_Reordering.cpp:
+ Add re-ordering module, this is the receiver-side module that
+ acks messages and delivers them in order to the application.
+
+ * ace/RMCast/RMCast.h:
+ Fixed data type in the Ack_Join message
+
+ * ace/RMCast/RMCast_Copy_On_Write.h:
+ * ace/RMCast/RMCast_Copy_On_Write.i:
+ * ace/RMCast/RMCast_Copy_On_Write.cpp:
+ * ace/RMCast/RMCast_Retransmission.cpp:
+ Cleanup the Copy_On_Write implementation
+
+ * tests/RMCast/Makefile:
+ * tests/RMCast/RMCast_Tests.dsw:
+ * tests/RMCast/RMCast_Reordering_Test.cpp:
+ * tests/RMCast/RMCast_Reordering_Test.dsp:
+ Add test for the reordering module.
+
+ * tests/RMCast/RMCast_Retransmission_Test.cpp:
+ Remove debug messages for the retransmission stuff.
+
Tue Oct 3 15:03:42 2000 Darrell Brunsch <brunsch@uci.edu>
* ace/OS.i:
@@ -17,11 +44,11 @@ Tue Oct 3 14:25:09 2000 Darrell Brunsch <brunsch@uci.edu>
* ace/OS.i:
Expanded TCHAR versions of some string macros to have
- both _A and _W versions. Moved many of these
+ both _A and _W versions. Moved many of these
definitions around, since a lot of the same code was
- being repeated. The macros changed are:
+ being repeated. The macros changed are:
ACE_DIRECTORY_SEPARATOR_STR, ACE_DIRECTORY_SEPARATOR_CHAR,
- ACE_PLATFORM, ACE_PLATFORM_EXE_SUFFIX,
+ ACE_PLATFORM, ACE_PLATFORM_EXE_SUFFIX,
ACE_DEFAULT_LOCALNAME, ACE_DEFAULT_GLOBALNAME and
ACE_DEFAULT_MUTEX. [Bug 674]
@@ -32,7 +59,7 @@ Tue Oct 3 14:25:09 2000 Darrell Brunsch <brunsch@uci.edu>
* ace/Process_Mutex.h:
Changed the ACE_Process_Mutex constructor to take a
- char as a argument instead of TCHAR. Also added a
+ char as a argument instead of TCHAR. Also added a
version that takes in wchar_t.
* ace/Synch.cpp:
@@ -45,10 +72,10 @@ Tue Oct 3 14:25:09 2000 Darrell Brunsch <brunsch@uci.edu>
* ace/ace_wchar.h:
* ace/ace_wchar.inl: (added)
- Moved the method definitions from inlined in the
+ Moved the method definitions from inlined in the
class definition to the .inl file.
- Added typedefs for ACE_OS_C/WString to the
+ Added typedefs for ACE_OS_C/WString to the
ACE_Wide_To_Ascii/Asci_To_Wide classes for legacy
support. [Bug 675]
@@ -78,17 +105,17 @@ Tue Oct 3 12:25:46 2000 Darrell Brunsch <brunsch@uci.edu>
* ace/Thread_Adapter.cpp:
* ace/Thread_Adapter.h:
- Separated ACE_Thread_Adapter::invoke into invoke and
+ Separated ACE_Thread_Adapter::invoke into invoke and
invoke_i because it uses SEH and James added local vars
with destructors. They don't work together in the same
function.
Tue Oct 03 12:53:28 2000 Angelo Corsaro <corsaro@cs.wustl.edu>
- * bin/run_all_list.pm:
+ * bin/run_all_list.pm:
- Disabled Connection Purging test, and enabled Exposed
- Policies test.
+ Disabled Connection Purging test, and enabled Exposed
+ Policies test.
Tue Oct 3 10:52:24 2000 Darrell Brunsch <brunsch@uci.edu>
@@ -98,39 +125,39 @@ Tue Oct 3 10:52:24 2000 Darrell Brunsch <brunsch@uci.edu>
Tue Oct 3 10:40:04 PDT 2000 James Hu <jxh@entera.com>
- * ace/Thread_Adapter.cpp:
- * ace/Thread_Exit.cpp:
- * ace/Thread_Exit.h:
+ * ace/Thread_Adapter.cpp:
+ * ace/Thread_Exit.cpp:
+ * ace/Thread_Exit.h:
- Changed this check to creating an ACE_Thread_Exit off the
+ Changed this check to creating an ACE_Thread_Exit off the
heap via a smart pointeresque way of the TSS instance
fails to create.
Tue Oct 3 05:55:01 2000 Douglas C. Schmidt <schmidt@ace.cs.wustl.edu>
- * ace/Hash_Map_Manager_T.cpp: Added && (__SUNPRO_CC == 0x420)
- to the #elif defined (__SUNPRO_CC) to make things work properly
- on the Sun FORTE compiler. Thanks to Russ Noseworthy for
- reporting this.
+ * ace/Hash_Map_Manager_T.cpp: Added && (__SUNPRO_CC == 0x420)
+ to the #elif defined (__SUNPRO_CC) to make things work properly
+ on the Sun FORTE compiler. Thanks to Russ Noseworthy for
+ reporting this.
Mon Oct 2 18:49:06 PDT 2000 James Hu <jxh@entera.com>
- * ace/Thread_Adapter.cpp:
+ * ace/Thread_Adapter.cpp:
- If the TSS instance of the ACE_Thread_Exit class fails to
- create, the application will crash. The new code will detect
- the failure, wait half a second, and try again. This
- solution, while cheesy, corrects a problem Entera
- encountered, and avoids the problems my previous attempt
- caused with ACE.
+ If the TSS instance of the ACE_Thread_Exit class fails to
+ create, the application will crash. The new code will detect
+ the failure, wait half a second, and try again. This
+ solution, while cheesy, corrects a problem Entera
+ encountered, and avoids the problems my previous attempt
+ caused with ACE.
Mon Oct 2 13:38:44 2000 Ossama Othman <ossama@uci.edu>
- * examples/QOS/QoS_Signal_Handler.cpp:
- * examples/QOS/QoS_Util.cpp:
+ * examples/QOS/QoS_Signal_Handler.cpp:
+ * examples/QOS/QoS_Util.cpp:
- Include "ace/Log_Msg.h" to pull in definitions of the
- ACE_{DEBUG,ERROR} macros.
+ Include "ace/Log_Msg.h" to pull in definitions of the
+ ACE_{DEBUG,ERROR} macros.
Mon Oct 2 11:29:47 2000 Carlos O'Ryan <coryan@uci.edu>
diff --git a/ChangeLogs/ChangeLog-02a b/ChangeLogs/ChangeLog-02a
index a4918748a7d..3d93f4b8e93 100644
--- a/ChangeLogs/ChangeLog-02a
+++ b/ChangeLogs/ChangeLog-02a
@@ -1,3 +1,30 @@
+Tue Oct 3 17:07:37 2000 Carlos O'Ryan <coryan@uci.edu>
+
+ * ace/RMCast/Makefile:
+ * ace/RMCast/RMCast_Reordering.h:
+ * ace/RMCast/RMCast_Reordering.i:
+ * ace/RMCast/RMCast_Reordering.cpp:
+ Add re-ordering module, this is the receiver-side module that
+ acks messages and delivers them in order to the application.
+
+ * ace/RMCast/RMCast.h:
+ Fixed data type in the Ack_Join message
+
+ * ace/RMCast/RMCast_Copy_On_Write.h:
+ * ace/RMCast/RMCast_Copy_On_Write.i:
+ * ace/RMCast/RMCast_Copy_On_Write.cpp:
+ * ace/RMCast/RMCast_Retransmission.cpp:
+ Cleanup the Copy_On_Write implementation
+
+ * tests/RMCast/Makefile:
+ * tests/RMCast/RMCast_Tests.dsw:
+ * tests/RMCast/RMCast_Reordering_Test.cpp:
+ * tests/RMCast/RMCast_Reordering_Test.dsp:
+ Add test for the reordering module.
+
+ * tests/RMCast/RMCast_Retransmission_Test.cpp:
+ Remove debug messages for the retransmission stuff.
+
Tue Oct 3 15:03:42 2000 Darrell Brunsch <brunsch@uci.edu>
* ace/OS.i:
@@ -17,11 +44,11 @@ Tue Oct 3 14:25:09 2000 Darrell Brunsch <brunsch@uci.edu>
* ace/OS.i:
Expanded TCHAR versions of some string macros to have
- both _A and _W versions. Moved many of these
+ both _A and _W versions. Moved many of these
definitions around, since a lot of the same code was
- being repeated. The macros changed are:
+ being repeated. The macros changed are:
ACE_DIRECTORY_SEPARATOR_STR, ACE_DIRECTORY_SEPARATOR_CHAR,
- ACE_PLATFORM, ACE_PLATFORM_EXE_SUFFIX,
+ ACE_PLATFORM, ACE_PLATFORM_EXE_SUFFIX,
ACE_DEFAULT_LOCALNAME, ACE_DEFAULT_GLOBALNAME and
ACE_DEFAULT_MUTEX. [Bug 674]
@@ -32,7 +59,7 @@ Tue Oct 3 14:25:09 2000 Darrell Brunsch <brunsch@uci.edu>
* ace/Process_Mutex.h:
Changed the ACE_Process_Mutex constructor to take a
- char as a argument instead of TCHAR. Also added a
+ char as a argument instead of TCHAR. Also added a
version that takes in wchar_t.
* ace/Synch.cpp:
@@ -45,10 +72,10 @@ Tue Oct 3 14:25:09 2000 Darrell Brunsch <brunsch@uci.edu>
* ace/ace_wchar.h:
* ace/ace_wchar.inl: (added)
- Moved the method definitions from inlined in the
+ Moved the method definitions from inlined in the
class definition to the .inl file.
- Added typedefs for ACE_OS_C/WString to the
+ Added typedefs for ACE_OS_C/WString to the
ACE_Wide_To_Ascii/Asci_To_Wide classes for legacy
support. [Bug 675]
@@ -78,17 +105,17 @@ Tue Oct 3 12:25:46 2000 Darrell Brunsch <brunsch@uci.edu>
* ace/Thread_Adapter.cpp:
* ace/Thread_Adapter.h:
- Separated ACE_Thread_Adapter::invoke into invoke and
+ Separated ACE_Thread_Adapter::invoke into invoke and
invoke_i because it uses SEH and James added local vars
with destructors. They don't work together in the same
function.
Tue Oct 03 12:53:28 2000 Angelo Corsaro <corsaro@cs.wustl.edu>
- * bin/run_all_list.pm:
+ * bin/run_all_list.pm:
- Disabled Connection Purging test, and enabled Exposed
- Policies test.
+ Disabled Connection Purging test, and enabled Exposed
+ Policies test.
Tue Oct 3 10:52:24 2000 Darrell Brunsch <brunsch@uci.edu>
@@ -98,39 +125,39 @@ Tue Oct 3 10:52:24 2000 Darrell Brunsch <brunsch@uci.edu>
Tue Oct 3 10:40:04 PDT 2000 James Hu <jxh@entera.com>
- * ace/Thread_Adapter.cpp:
- * ace/Thread_Exit.cpp:
- * ace/Thread_Exit.h:
+ * ace/Thread_Adapter.cpp:
+ * ace/Thread_Exit.cpp:
+ * ace/Thread_Exit.h:
- Changed this check to creating an ACE_Thread_Exit off the
+ Changed this check to creating an ACE_Thread_Exit off the
heap via a smart pointeresque way of the TSS instance
fails to create.
Tue Oct 3 05:55:01 2000 Douglas C. Schmidt <schmidt@ace.cs.wustl.edu>
- * ace/Hash_Map_Manager_T.cpp: Added && (__SUNPRO_CC == 0x420)
- to the #elif defined (__SUNPRO_CC) to make things work properly
- on the Sun FORTE compiler. Thanks to Russ Noseworthy for
- reporting this.
+ * ace/Hash_Map_Manager_T.cpp: Added && (__SUNPRO_CC == 0x420)
+ to the #elif defined (__SUNPRO_CC) to make things work properly
+ on the Sun FORTE compiler. Thanks to Russ Noseworthy for
+ reporting this.
Mon Oct 2 18:49:06 PDT 2000 James Hu <jxh@entera.com>
- * ace/Thread_Adapter.cpp:
+ * ace/Thread_Adapter.cpp:
- If the TSS instance of the ACE_Thread_Exit class fails to
- create, the application will crash. The new code will detect
- the failure, wait half a second, and try again. This
- solution, while cheesy, corrects a problem Entera
- encountered, and avoids the problems my previous attempt
- caused with ACE.
+ If the TSS instance of the ACE_Thread_Exit class fails to
+ create, the application will crash. The new code will detect
+ the failure, wait half a second, and try again. This
+ solution, while cheesy, corrects a problem Entera
+ encountered, and avoids the problems my previous attempt
+ caused with ACE.
Mon Oct 2 13:38:44 2000 Ossama Othman <ossama@uci.edu>
- * examples/QOS/QoS_Signal_Handler.cpp:
- * examples/QOS/QoS_Util.cpp:
+ * examples/QOS/QoS_Signal_Handler.cpp:
+ * examples/QOS/QoS_Util.cpp:
- Include "ace/Log_Msg.h" to pull in definitions of the
- ACE_{DEBUG,ERROR} macros.
+ Include "ace/Log_Msg.h" to pull in definitions of the
+ ACE_{DEBUG,ERROR} macros.
Mon Oct 2 11:29:47 2000 Carlos O'Ryan <coryan@uci.edu>
diff --git a/ChangeLogs/ChangeLog-03a b/ChangeLogs/ChangeLog-03a
index a4918748a7d..3d93f4b8e93 100644
--- a/ChangeLogs/ChangeLog-03a
+++ b/ChangeLogs/ChangeLog-03a
@@ -1,3 +1,30 @@
+Tue Oct 3 17:07:37 2000 Carlos O'Ryan <coryan@uci.edu>
+
+ * ace/RMCast/Makefile:
+ * ace/RMCast/RMCast_Reordering.h:
+ * ace/RMCast/RMCast_Reordering.i:
+ * ace/RMCast/RMCast_Reordering.cpp:
+ Add re-ordering module, this is the receiver-side module that
+ acks messages and delivers them in order to the application.
+
+ * ace/RMCast/RMCast.h:
+ Fixed data type in the Ack_Join message
+
+ * ace/RMCast/RMCast_Copy_On_Write.h:
+ * ace/RMCast/RMCast_Copy_On_Write.i:
+ * ace/RMCast/RMCast_Copy_On_Write.cpp:
+ * ace/RMCast/RMCast_Retransmission.cpp:
+ Cleanup the Copy_On_Write implementation
+
+ * tests/RMCast/Makefile:
+ * tests/RMCast/RMCast_Tests.dsw:
+ * tests/RMCast/RMCast_Reordering_Test.cpp:
+ * tests/RMCast/RMCast_Reordering_Test.dsp:
+ Add test for the reordering module.
+
+ * tests/RMCast/RMCast_Retransmission_Test.cpp:
+ Remove debug messages for the retransmission stuff.
+
Tue Oct 3 15:03:42 2000 Darrell Brunsch <brunsch@uci.edu>
* ace/OS.i:
@@ -17,11 +44,11 @@ Tue Oct 3 14:25:09 2000 Darrell Brunsch <brunsch@uci.edu>
* ace/OS.i:
Expanded TCHAR versions of some string macros to have
- both _A and _W versions. Moved many of these
+ both _A and _W versions. Moved many of these
definitions around, since a lot of the same code was
- being repeated. The macros changed are:
+ being repeated. The macros changed are:
ACE_DIRECTORY_SEPARATOR_STR, ACE_DIRECTORY_SEPARATOR_CHAR,
- ACE_PLATFORM, ACE_PLATFORM_EXE_SUFFIX,
+ ACE_PLATFORM, ACE_PLATFORM_EXE_SUFFIX,
ACE_DEFAULT_LOCALNAME, ACE_DEFAULT_GLOBALNAME and
ACE_DEFAULT_MUTEX. [Bug 674]
@@ -32,7 +59,7 @@ Tue Oct 3 14:25:09 2000 Darrell Brunsch <brunsch@uci.edu>
* ace/Process_Mutex.h:
Changed the ACE_Process_Mutex constructor to take a
- char as a argument instead of TCHAR. Also added a
+ char as a argument instead of TCHAR. Also added a
version that takes in wchar_t.
* ace/Synch.cpp:
@@ -45,10 +72,10 @@ Tue Oct 3 14:25:09 2000 Darrell Brunsch <brunsch@uci.edu>
* ace/ace_wchar.h:
* ace/ace_wchar.inl: (added)
- Moved the method definitions from inlined in the
+ Moved the method definitions from inlined in the
class definition to the .inl file.
- Added typedefs for ACE_OS_C/WString to the
+ Added typedefs for ACE_OS_C/WString to the
ACE_Wide_To_Ascii/Asci_To_Wide classes for legacy
support. [Bug 675]
@@ -78,17 +105,17 @@ Tue Oct 3 12:25:46 2000 Darrell Brunsch <brunsch@uci.edu>
* ace/Thread_Adapter.cpp:
* ace/Thread_Adapter.h:
- Separated ACE_Thread_Adapter::invoke into invoke and
+ Separated ACE_Thread_Adapter::invoke into invoke and
invoke_i because it uses SEH and James added local vars
with destructors. They don't work together in the same
function.
Tue Oct 03 12:53:28 2000 Angelo Corsaro <corsaro@cs.wustl.edu>
- * bin/run_all_list.pm:
+ * bin/run_all_list.pm:
- Disabled Connection Purging test, and enabled Exposed
- Policies test.
+ Disabled Connection Purging test, and enabled Exposed
+ Policies test.
Tue Oct 3 10:52:24 2000 Darrell Brunsch <brunsch@uci.edu>
@@ -98,39 +125,39 @@ Tue Oct 3 10:52:24 2000 Darrell Brunsch <brunsch@uci.edu>
Tue Oct 3 10:40:04 PDT 2000 James Hu <jxh@entera.com>
- * ace/Thread_Adapter.cpp:
- * ace/Thread_Exit.cpp:
- * ace/Thread_Exit.h:
+ * ace/Thread_Adapter.cpp:
+ * ace/Thread_Exit.cpp:
+ * ace/Thread_Exit.h:
- Changed this check to creating an ACE_Thread_Exit off the
+ Changed this check to creating an ACE_Thread_Exit off the
heap via a smart pointeresque way of the TSS instance
fails to create.
Tue Oct 3 05:55:01 2000 Douglas C. Schmidt <schmidt@ace.cs.wustl.edu>
- * ace/Hash_Map_Manager_T.cpp: Added && (__SUNPRO_CC == 0x420)
- to the #elif defined (__SUNPRO_CC) to make things work properly
- on the Sun FORTE compiler. Thanks to Russ Noseworthy for
- reporting this.
+ * ace/Hash_Map_Manager_T.cpp: Added && (__SUNPRO_CC == 0x420)
+ to the #elif defined (__SUNPRO_CC) to make things work properly
+ on the Sun FORTE compiler. Thanks to Russ Noseworthy for
+ reporting this.
Mon Oct 2 18:49:06 PDT 2000 James Hu <jxh@entera.com>
- * ace/Thread_Adapter.cpp:
+ * ace/Thread_Adapter.cpp:
- If the TSS instance of the ACE_Thread_Exit class fails to
- create, the application will crash. The new code will detect
- the failure, wait half a second, and try again. This
- solution, while cheesy, corrects a problem Entera
- encountered, and avoids the problems my previous attempt
- caused with ACE.
+ If the TSS instance of the ACE_Thread_Exit class fails to
+ create, the application will crash. The new code will detect
+ the failure, wait half a second, and try again. This
+ solution, while cheesy, corrects a problem Entera
+ encountered, and avoids the problems my previous attempt
+ caused with ACE.
Mon Oct 2 13:38:44 2000 Ossama Othman <ossama@uci.edu>
- * examples/QOS/QoS_Signal_Handler.cpp:
- * examples/QOS/QoS_Util.cpp:
+ * examples/QOS/QoS_Signal_Handler.cpp:
+ * examples/QOS/QoS_Util.cpp:
- Include "ace/Log_Msg.h" to pull in definitions of the
- ACE_{DEBUG,ERROR} macros.
+ Include "ace/Log_Msg.h" to pull in definitions of the
+ ACE_{DEBUG,ERROR} macros.
Mon Oct 2 11:29:47 2000 Carlos O'Ryan <coryan@uci.edu>
diff --git a/ace/RMCast/Makefile b/ace/RMCast/Makefile
index e97cd885493..868795fa82c 100644
--- a/ace/RMCast/Makefile
+++ b/ace/RMCast/Makefile
@@ -18,6 +18,7 @@ FILES= \
RMCast_Proxy \
RMCast_Membership \
RMCast_Retransmission \
+ RMCast_Reordering \
\
RMCast_IO_UDP \
RMCast_UDP_Event_Handler \
@@ -710,6 +711,103 @@ include $(ACE_ROOT)/include/makeinclude/rules.local.GNU
$(ACE_ROOT)/ace/Message_Block_T.i \
$(ACE_ROOT)/ace/Message_Block_T.cpp
+.obj/RMCast_Reordering.o .obj/RMCast_Reordering.so .shobj/RMCast_Reordering.o .shobj/RMCast_Reordering.so: RMCast_Reordering.cpp RMCast_Reordering.h \
+ $(ACE_ROOT)/ace/pre.h \
+ RMCast_Module.h RMCast.h \
+ $(ACE_ROOT)/ace/OS.h \
+ $(ACE_ROOT)/ace/post.h \
+ $(ACE_ROOT)/ace/ACE_export.h \
+ $(ACE_ROOT)/ace/svc_export.h \
+ $(ACE_ROOT)/ace/ace_wchar.h \
+ $(ACE_ROOT)/ace/OS_Dirent.h \
+ $(ACE_ROOT)/ace/OS_Export.h \
+ $(ACE_ROOT)/ace/OS_Dirent.inl \
+ $(ACE_ROOT)/ace/OS_String.h \
+ $(ACE_ROOT)/ace/OS_String.inl \
+ $(ACE_ROOT)/ace/OS_Memory.h \
+ $(ACE_ROOT)/ace/OS_Memory.inl \
+ $(ACE_ROOT)/ace/OS_TLI.h \
+ $(ACE_ROOT)/ace/OS_TLI.inl \
+ $(ACE_ROOT)/ace/Min_Max.h \
+ $(ACE_ROOT)/ace/streams.h \
+ $(ACE_ROOT)/ace/Basic_Types.h \
+ $(ACE_ROOT)/ace/Basic_Types.i \
+ $(ACE_ROOT)/ace/Trace.h \
+ $(ACE_ROOT)/ace/OS.i \
+ RMCast_Export.h RMCast.i RMCast_Module.i RMCast_Copy_On_Write.h \
+ RMCast_Worker.h RMCast_Worker.i RMCast_Worker.cpp \
+ $(ACE_ROOT)/ace/Synch.h \
+ $(ACE_ROOT)/ace/ACE.h \
+ $(ACE_ROOT)/ace/ACE.i \
+ $(ACE_ROOT)/ace/Synch.i \
+ $(ACE_ROOT)/ace/Synch_T.h \
+ $(ACE_ROOT)/ace/Event_Handler.h \
+ $(ACE_ROOT)/ace/Event_Handler.i \
+ $(ACE_ROOT)/ace/Synch_T.i \
+ $(ACE_ROOT)/ace/Thread.h \
+ $(ACE_ROOT)/ace/Thread_Adapter.h \
+ $(ACE_ROOT)/ace/Base_Thread_Adapter.h \
+ $(ACE_ROOT)/ace/Base_Thread_Adapter.inl \
+ $(ACE_ROOT)/ace/Thread_Adapter.inl \
+ $(ACE_ROOT)/ace/Thread.i \
+ $(ACE_ROOT)/ace/Atomic_Op.i \
+ $(ACE_ROOT)/ace/Synch_T.cpp \
+ $(ACE_ROOT)/ace/Log_Msg.h \
+ $(ACE_ROOT)/ace/Log_Record.h \
+ $(ACE_ROOT)/ace/Log_Priority.h \
+ $(ACE_ROOT)/ace/Log_Record.i \
+ RMCast_Copy_On_Write.i RMCast_Copy_On_Write.cpp \
+ $(ACE_ROOT)/ace/RB_Tree.h \
+ $(ACE_ROOT)/ace/Functor.h \
+ $(ACE_ROOT)/ace/Functor.i \
+ $(ACE_ROOT)/ace/Functor_T.h \
+ $(ACE_ROOT)/ace/Functor_T.i \
+ $(ACE_ROOT)/ace/Functor_T.cpp \
+ $(ACE_ROOT)/ace/RB_Tree.i \
+ $(ACE_ROOT)/ace/Malloc.h \
+ $(ACE_ROOT)/ace/Malloc_Base.h \
+ $(ACE_ROOT)/ace/Based_Pointer_T.h \
+ $(ACE_ROOT)/ace/Based_Pointer_T.i \
+ $(ACE_ROOT)/ace/Based_Pointer_T.cpp \
+ $(ACE_ROOT)/ace/Based_Pointer_Repository.h \
+ $(ACE_ROOT)/ace/Singleton.h \
+ $(ACE_ROOT)/ace/Singleton.i \
+ $(ACE_ROOT)/ace/Singleton.cpp \
+ $(ACE_ROOT)/ace/Object_Manager.h \
+ $(ACE_ROOT)/ace/Object_Manager.i \
+ $(ACE_ROOT)/ace/Managed_Object.h \
+ $(ACE_ROOT)/ace/Managed_Object.i \
+ $(ACE_ROOT)/ace/Managed_Object.cpp \
+ $(ACE_ROOT)/ace/Malloc.i \
+ $(ACE_ROOT)/ace/Malloc_T.h \
+ $(ACE_ROOT)/ace/Free_List.h \
+ $(ACE_ROOT)/ace/Free_List.i \
+ $(ACE_ROOT)/ace/Free_List.cpp \
+ $(ACE_ROOT)/ace/Malloc_T.i \
+ $(ACE_ROOT)/ace/Malloc_T.cpp \
+ $(ACE_ROOT)/ace/Memory_Pool.h \
+ $(ACE_ROOT)/ace/Signal.h \
+ $(ACE_ROOT)/ace/Containers.h \
+ $(ACE_ROOT)/ace/Containers.i \
+ $(ACE_ROOT)/ace/Containers_T.h \
+ $(ACE_ROOT)/ace/Containers_T.i \
+ $(ACE_ROOT)/ace/Containers_T.cpp \
+ $(ACE_ROOT)/ace/Signal.i \
+ $(ACE_ROOT)/ace/Mem_Map.h \
+ $(ACE_ROOT)/ace/Mem_Map.i \
+ $(ACE_ROOT)/ace/SV_Semaphore_Complex.h \
+ $(ACE_ROOT)/ace/SV_Semaphore_Simple.h \
+ $(ACE_ROOT)/ace/SV_Semaphore_Simple.i \
+ $(ACE_ROOT)/ace/SV_Semaphore_Complex.i \
+ $(ACE_ROOT)/ace/Memory_Pool.i \
+ $(ACE_ROOT)/ace/RB_Tree.cpp \
+ RMCast_Reordering.i RMCast_Proxy.h RMCast_Proxy.i \
+ $(ACE_ROOT)/ace/Message_Block.h \
+ $(ACE_ROOT)/ace/Message_Block.i \
+ $(ACE_ROOT)/ace/Message_Block_T.h \
+ $(ACE_ROOT)/ace/Message_Block_T.i \
+ $(ACE_ROOT)/ace/Message_Block_T.cpp
+
.obj/RMCast_IO_UDP.o .obj/RMCast_IO_UDP.so .shobj/RMCast_IO_UDP.o .shobj/RMCast_IO_UDP.so: RMCast_IO_UDP.cpp RMCast_IO_UDP.h \
$(ACE_ROOT)/ace/pre.h \
RMCast_Module.h RMCast.h \
diff --git a/ace/RMCast/RMCast.h b/ace/RMCast/RMCast.h
index df3a0d48858..55e9b5d0368 100644
--- a/ace/RMCast/RMCast.h
+++ b/ace/RMCast/RMCast.h
@@ -207,7 +207,7 @@ public:
*/
struct Ack_Join
{
- ACE_INT32 next_sequence_number;
+ ACE_UINT32 next_sequence_number;
//! Pass the proxy source between layers
ACE_RMCast_Proxy *source;
diff --git a/ace/RMCast/RMCast_Copy_On_Write.cpp b/ace/RMCast/RMCast_Copy_On_Write.cpp
index f1553c7f4ab..20a76e559e0 100644
--- a/ace/RMCast/RMCast_Copy_On_Write.cpp
+++ b/ace/RMCast/RMCast_Copy_On_Write.cpp
@@ -39,11 +39,8 @@ ACE_RMCast_Copy_On_Write_Collection<COLLECTION,ITERATOR>::_decr_refcnt (void)
template<class KEY, class ITEM, class COLLECTION, class ITERATOR>
ACE_RMCast_Copy_On_Write<KEY,ITEM,COLLECTION,ITERATOR>::
ACE_RMCast_Copy_On_Write (void)
- : pending_writes_ (0)
- , writing_ (0)
- , cond_ (mutex_)
+ : ACE_RMCast_Copy_On_Write_Container<COLLECTION,ITERATOR> ()
{
- ACE_NEW (this->collection_, Collection);
}
template<class KEY, class ITEM, class COLLECTION, class ITERATOR>
@@ -69,8 +66,10 @@ ACE_RMCast_Copy_On_Write<KEY,ITEM,COLLECTION,ITERATOR>::
for (ITERATOR i = ace_mon.collection->collection.begin (); i != end; ++i)
{
int r = worker->work ((*i).key (), (*i).item ());
- if (r != 0)
- return r;
+ if (r == 1)
+ return 0; // Abort loop, but no error
+ if (r == -1)
+ return -1;
}
return 0;
}
@@ -79,11 +78,7 @@ template<class KEY, class ITEM, class C, class I> int
ACE_RMCast_Copy_On_Write<KEY,ITEM,C,I>::bind (KEY const & k,
ITEM const & i)
{
- Write_Guard ace_mon (this->mutex_,
- this->cond_,
- this->pending_writes_,
- this->writing_,
- this->collection_);
+ Write_Guard ace_mon (*this);
return this->bind_i (ace_mon, k, i);
}
@@ -91,19 +86,15 @@ ACE_RMCast_Copy_On_Write<KEY,ITEM,C,I>::bind (KEY const & k,
template<class KEY, class ITEM, class C, class I> int
ACE_RMCast_Copy_On_Write<KEY,ITEM,C,I>::unbind (KEY const & k)
{
- Write_Guard ace_mon (this->mutex_,
- this->cond_,
- this->pending_writes_,
- this->writing_,
- this->collection_);
+ Write_Guard ace_mon (*this);
return this->unbind_i (ace_mon, k);
}
template<class KEY, class ITEM, class C, class I> int
ACE_RMCast_Copy_On_Write<KEY,ITEM,C,I>::bind_i (Write_Guard &ace_mon,
- KEY const & k,
- ITEM const & i)
+ KEY const & k,
+ ITEM const & i)
{
return ace_mon.copy->collection.bind (k, i);
}
@@ -118,18 +109,25 @@ ACE_RMCast_Copy_On_Write<KEY,ITEM,C,I>::unbind_i (Write_Guard &ace_mon,
// ****************************************************************
template<class COLLECTION, class ITERATOR>
+ACE_RMCast_Copy_On_Write_Container<COLLECTION,ITERATOR>::ACE_RMCast_Copy_On_Write_Container (void)
+ : pending_writes_ (0)
+ , writing_ (0)
+ , cond_ (mutex_)
+{
+ ACE_NEW (this->collection_, Collection);
+}
+
+// ****************************************************************
+
+template<class COLLECTION, class ITERATOR>
ACE_RMCast_Copy_On_Write_Write_Guard<COLLECTION,ITERATOR>::
- ACE_RMCast_Copy_On_Write_Write_Guard (ACE_SYNCH_MUTEX &m,
- ACE_SYNCH_CONDITION &c,
- int &p,
- int &w,
- Collection*& cr)
+ ACE_RMCast_Copy_On_Write_Write_Guard (ACE_RMCast_Copy_On_Write_Container<COLLECTION,ITERATOR> &container)
: copy (0)
- , mutex (m)
- , cond (c)
- , pending_writes (p)
- , writing_flag (w)
- , collection (cr)
+ , mutex (container.mutex_)
+ , cond (container.cond_)
+ , pending_writes (container.pending_writes_)
+ , writing_flag (container.writing_)
+ , collection (container.collection_)
{
{
ACE_GUARD (ACE_SYNCH_MUTEX, ace_mon, this->mutex);
@@ -168,6 +166,8 @@ ACE_RMCast_Copy_On_Write_Write_Guard<COLLECTION,ITERATOR>::
this->cond.signal ();
}
// Delete outside the mutex, because it may take a long time.
+ // @@ Is this right? What happens if several readers are still
+ // using the old copy?
tmp->_decr_refcnt ();
}
diff --git a/ace/RMCast/RMCast_Copy_On_Write.h b/ace/RMCast/RMCast_Copy_On_Write.h
index 8724e23a5d5..e06aa5f1689 100644
--- a/ace/RMCast/RMCast_Copy_On_Write.h
+++ b/ace/RMCast/RMCast_Copy_On_Write.h
@@ -57,6 +57,47 @@ private:
// ****************************************************************
+template<class COLLECTION, class ITERATOR>
+class ACE_RMCast_Copy_On_Write_Write_Guard;
+
+//! Base class for the Copy_On_Write collection, used to simplify the
+//! declaration of the Write_Guard
+template<class COLLECTION, class ITERATOR>
+class ACE_RMCast_Copy_On_Write_Container
+{
+public:
+ //! Constructor
+ ACE_RMCast_Copy_On_Write_Container (void);
+
+ //! Let the Write_Guard access the internal fields.
+ friend ACE_RMCast_Copy_On_Write_Write_Guard<COLLECTION,ITERATOR>;
+
+ //! A shorter name for the actual collection type
+ typedef ACE_RMCast_Copy_On_Write_Collection<COLLECTION,ITERATOR> Collection;
+
+protected:
+ //! Number of pending writes
+ int pending_writes_;
+
+ //! If non-zero then a thread is changing the collection.
+ /*!
+ * Many threads can use the collection simulatenously, but only one
+ * change it.
+ */
+ int writing_;
+
+ //! A mutex to serialize access to the collection pointer.
+ ACE_SYNCH_MUTEX mutex_;
+
+ //! A condition variable to wait to synchronize multiple writers.
+ ACE_SYNCH_CONDITION cond_;
+
+ //! The collection, with reference counting added
+ Collection *collection_;
+};
+
+// ****************************************************************
+
//! Implement the write guard for a reference counted collecion
/*!
* This helper class atomically increments the reference count of a
@@ -70,11 +111,7 @@ public:
typedef ACE_RMCast_Copy_On_Write_Collection<COLLECTION,ITERATOR> Collection;
//! Constructor
- ACE_RMCast_Copy_On_Write_Write_Guard (ACE_SYNCH_MUTEX &mutex,
- ACE_SYNCH_CONDITION &cond,
- int &pending_writes,
- int &writing_flag,
- Collection*& collection);
+ ACE_RMCast_Copy_On_Write_Write_Guard (ACE_RMCast_Copy_On_Write_Container<COLLECTION,ITERATOR> &container);
//! Destructor
~ACE_RMCast_Copy_On_Write_Write_Guard (void);
@@ -100,11 +137,20 @@ private:
Collection *&collection;
};
+
// ****************************************************************
//! Implement a copy on write wrapper for a map-like collection
+/*
+ *
+ * <B>WARNING: </B> This class may be moved away in the future, I'm
+ * investigating how it could be converted into a reusable component
+ * in ACE. I won't make promises on when will that happen, but I
+ * won't promise that it will stay here either.
+ *
+ */
template<class KEY, class ITEM, class COLLECTION, class ITERATOR>
-class ACE_RMCast_Copy_On_Write
+class ACE_RMCast_Copy_On_Write : public ACE_RMCast_Copy_On_Write_Container<COLLECTION,ITERATOR>
{
public:
//! The Read_Guard trait
@@ -136,25 +182,6 @@ public:
//! Unbind assuming the Write_Guard is held
int unbind_i (Write_Guard &guard, KEY const & key);
-
- //! Number of pending writes
- int pending_writes_;
-
- //! If non-zero then a thread is changing the collection.
- /*!
- * Many threads can use the collection simulatenously, but only one
- * change it.
- */
- int writing_;
-
- //! A mutex to serialize access to the collection pointer.
- ACE_SYNCH_MUTEX mutex_;
-
- //! A condition variable to wait to synchronize multiple writers.
- ACE_SYNCH_CONDITION cond_;
-
- //! The collection, with reference counting added
- Collection *collection_;
};
#if defined (__ACE_INLINE__)
diff --git a/ace/RMCast/RMCast_Copy_On_Write.i b/ace/RMCast/RMCast_Copy_On_Write.i
index c6e5099cda5..354dd51bf5a 100644
--- a/ace/RMCast/RMCast_Copy_On_Write.i
+++ b/ace/RMCast/RMCast_Copy_On_Write.i
@@ -33,4 +33,3 @@ ACE_RMCast_Copy_On_Write_Read_Guard<COLLECTION,ITERATOR>::
}
// ****************************************************************
-
diff --git a/ace/RMCast/RMCast_Reordering.cpp b/ace/RMCast/RMCast_Reordering.cpp
new file mode 100644
index 00000000000..f63c6c66f34
--- /dev/null
+++ b/ace/RMCast/RMCast_Reordering.cpp
@@ -0,0 +1,153 @@
+//
+// $Id$
+//
+
+#include "RMCast_Reordering.h"
+#include "RMCast_Proxy.h"
+#include "ace/Message_Block.h"
+
+#if !defined (__ACE_INLINE__)
+# include "RMCast_Reordering.i"
+#endif /* ! __ACE_INLINE__ */
+
+ACE_RCSID(ace, RMCast_Reordering, "$Id$")
+
+ACE_RMCast_Reordering::~ACE_RMCast_Reordering (void)
+{
+}
+
+int
+ACE_RMCast_Reordering::close (void)
+{
+ // @@
+ return 0;
+}
+
+int
+ACE_RMCast_Reordering::data (ACE_RMCast::Data &data)
+{
+ int must_ack = 0;
+ ACE_RMCast::Ack ack;
+
+ // ACE_DEBUG ((LM_DEBUG, "Received message (%d)\n", data.sequence_number));
+ {
+ ACE_GUARD_RETURN (ACE_SYNCH_MUTEX, ace_mon, this->mutex_, -1);
+
+ if (data.sequence_number < this->next_expected_)
+ {
+ // Old message. Ack with the current status (look at the end
+ // of this block).
+ must_ack = 1;
+
+ // ACE_DEBUG ((LM_DEBUG, ".... old message is ignored\n"));
+ }
+
+ else if (data.sequence_number == this->next_expected_)
+ {
+ // ACE_DEBUG ((LM_DEBUG, ".... message is in order, received\n"));
+
+ // Accept the message, the current thread will dispatch it, so
+ // it is marked as accepted (using the <next_expected> field).
+ // Any other thread will not push that message because now it
+ // is "old".
+
+ this->next_expected_++;
+
+ // Right message, process as many messages as possible from
+ // the queue, then ack the right level...
+
+ // NOTE: we cannot release the mutex while dispatching
+ // events, otherwise: how do we stop other threads from
+ // delivering messages out of order? I.E. what if the
+ // next thread receives the next message?
+ if (this->next () != 0)
+ (void) this->next ()->data (data);
+
+ // After delivering one message there may be more messages
+ // pending
+ this->push_queued_messages ();
+
+ //@@ This should be strategized, for example, only Ack if
+ // there is a message out of order or something, otherwise
+ // continue with happiness. That works well for "optimistic
+ // models".
+ must_ack = 1;
+ }
+
+ else
+ {
+ // ACE_DEBUG ((LM_DEBUG, ".... message out of sequence, saved\n"));
+
+ // Out of sequence.
+ if (this->highest_received_ < data.sequence_number)
+ {
+ this->highest_received_ = data.sequence_number;
+ }
+ (void) this->messages_.bind (data.sequence_number, data);
+ // re-ack, otherwise save it and ack.
+ }
+
+ ack.next_expected = this->next_expected_;
+ ack.highest_received = this->highest_received_;
+ }
+
+ if (!must_ack || data.source == 0)
+ return 0;
+ return data.source->reply_ack (ack);
+}
+
+int
+ACE_RMCast_Reordering::ack_join (ACE_RMCast::Ack_Join &ack_join)
+{
+ {
+ ACE_GUARD_RETURN (ACE_SYNCH_MUTEX, ace_mon, this->mutex_, -1);
+ if (this->next_expected_ >= ack_join.next_sequence_number)
+ {
+ // Nothing to do in this case...
+ return 0;
+ }
+
+ Messages_Iterator i = this->messages_.begin ();
+ Messages_Iterator end = this->messages_.end ();
+
+ while (i != end
+ && (*i).key () < ack_join.next_sequence_number)
+ {
+ this->messages_.unbind ((*i).key ());
+ i = this->messages_.begin ();
+ }
+
+ this->next_expected_ = ack_join.next_sequence_number;
+ if (this->highest_received_ < ack_join.next_sequence_number)
+ this->highest_received_ = ack_join.next_sequence_number;
+
+ this->push_queued_messages ();
+ }
+
+ return 0;
+}
+
+void
+ACE_RMCast_Reordering::push_queued_messages (void)
+{
+ Messages_Iterator i = this->messages_.begin ();
+ Messages_Iterator end = this->messages_.end ();
+
+ while (i != end
+ && (*i).key () == this->next_expected_)
+ {
+ if (this->next () != 0)
+ {
+ ACE_RMCast::Data data = (*i).item ();
+ this->next ()->data (data);
+ }
+
+ this->messages_.unbind ((*i).key ());
+ i = this->messages_.begin ();
+ this->next_expected_++;
+ }
+}
+
+#if defined (ACE_HAS_EXPLICIT_TEMPLATE_INSTANTIATION)
+
+#endif /* ACE_HAS_EXPLICIT_TEMPLATE_INSTANTIATION */
diff --git a/ace/RMCast/RMCast_Reordering.h b/ace/RMCast/RMCast_Reordering.h
new file mode 100644
index 00000000000..0f6c777913c
--- /dev/null
+++ b/ace/RMCast/RMCast_Reordering.h
@@ -0,0 +1,94 @@
+/* -*- C++ -*- */
+// $Id$
+
+// ============================================================================
+//
+// = LIBRARY
+// ace/RMCast
+//
+// = AUTHOR
+// Carlos O'Ryan <coryan@uci.edu>
+//
+// ============================================================================
+
+#ifndef ACE_RMCAST_REORDERING_H
+#define ACE_RMCAST_REORDERING_H
+#include "ace/pre.h"
+
+#include "RMCast_Module.h"
+#include "ace/RB_Tree.h"
+#include "ace/Synch.h"
+
+#if !defined (ACE_LACKS_PRAGMA_ONCE)
+# pragma once
+#endif /* ACE_LACKS_PRAGMA_ONCE */
+
+class ACE_RMCast_Proxy;
+
+//! Pass messages up in sent order
+/*!
+ * Some applications require receivers to process messages in the same
+ * order that messages are sent. This module buffers out of order
+ * messages and only delivers a message if:
+ * - All the previous messages have been delivered.
+ * - The sender sends a notification that previous messages will not
+ * be resent.
+ *
+ * The module also sends the Ack feedback to the sender.
+ *
+ * NOTE: This is not the same as causal or total ordering, that could
+ * be implemented someday, but requires a lot more than what we have
+ * right now.
+ *
+ */
+class ACE_RMCast_Export ACE_RMCast_Reordering : public ACE_RMCast_Module
+{
+public:
+ //! Constructor
+ ACE_RMCast_Reordering (void);
+
+ //! Destructor
+ virtual ~ACE_RMCast_Reordering (void);
+
+ //! Use a Red-Black Tree to keep the queue of messages
+ typedef ACE_RB_Tree<ACE_UINT32,ACE_RMCast::Data,ACE_Less_Than<ACE_UINT32>,ACE_Null_Mutex> Messages;
+ typedef ACE_RB_Tree_Iterator<ACE_UINT32,ACE_RMCast::Data,ACE_Less_Than<ACE_UINT32>,ACE_Null_Mutex> Messages_Iterator;
+
+ //! Remove messages still pending
+ virtual int close (void);
+
+ //! Process a Data message.
+ /*!
+ * Process a Data message, sending the right Ack message back.
+ * The message is passed up only if it is in order.
+ */
+ virtual int data (ACE_RMCast::Data &);
+
+ //! During the join process the server informs us of the next
+ //! expected message
+ virtual int ack_join (ACE_RMCast::Ack_Join &);
+
+private:
+ //! Push any messages that are pending in the queue
+ void push_queued_messages (void);
+
+protected:
+ //! The reordering buffer
+ Messages messages_;
+
+ //! The smallest value of \param next_expected for all the proxies
+ ACE_UINT32 next_expected_;
+
+ //! The highest value of \param highest_received for all the proxies
+ ACE_UINT32 highest_received_;
+
+ //! Synchronization
+ ACE_SYNCH_MUTEX mutex_;
+};
+
+#if defined (__ACE_INLINE__)
+#include "RMCast_Reordering.i"
+#endif /* __ACE_INLINE__ */
+
+#include "ace/post.h"
+#endif /* ACE_RMCAST_REORDERING_H */
diff --git a/ace/RMCast/RMCast_Reordering.i b/ace/RMCast/RMCast_Reordering.i
new file mode 100644
index 00000000000..ccbf852bc67
--- /dev/null
+++ b/ace/RMCast/RMCast_Reordering.i
@@ -0,0 +1,8 @@
+// $Id$
+
+ACE_INLINE
+ACE_RMCast_Reordering::ACE_RMCast_Reordering (void)
+ : next_expected_ (0)
+ , highest_received_ (0)
+{
+}
diff --git a/ace/RMCast/RMCast_Retransmission.cpp b/ace/RMCast/RMCast_Retransmission.cpp
index 7e38cdf7c97..4d40a59ac5e 100644
--- a/ace/RMCast/RMCast_Retransmission.cpp
+++ b/ace/RMCast/RMCast_Retransmission.cpp
@@ -33,9 +33,9 @@ public:
{
if (key > this->max_sequence_number_)
return 0;
- ACE_DEBUG ((LM_DEBUG,
- " Retransmission::resend - message %d resent\n",
- key));
+ // ACE_DEBUG ((LM_DEBUG,
+ // " Retransmission::resend - message %d resent\n",
+ // key));
ACE_RMCast::Data data = item;
int r = this->next_->data (data);
if (r != 0)
@@ -138,9 +138,9 @@ public:
{
if (key >= this->ack_.next_expected)
return 0;
- ACE_DEBUG ((LM_DEBUG,
- " Retransmission::ack - message %d erased\n",
- key));
+ // ACE_DEBUG ((LM_DEBUG,
+ // " Retransmission::ack - message %d erased\n",
+ // key));
return this->messages_->unbind_i (this->ace_mon_, key);
}
@@ -159,11 +159,7 @@ private:
int
ACE_RMCast_Retransmission::ack (ACE_RMCast::Ack &ack)
{
- Messages::Write_Guard ace_mon (this->messages_.mutex_,
- this->messages_.cond_,
- this->messages_.pending_writes_,
- this->messages_.writing_,
- this->messages_.collection_);
+ Messages::Write_Guard ace_mon (this->messages_);
ACE_RMCast_Ack_Worker worker (ack, ace_mon, &this->messages_);
@@ -179,6 +175,7 @@ template class ACE_RB_Tree_Reverse_Iterator<ACE_UINT32,ACE_RMCast::Data,ACE_Less
template class ACE_RB_Tree_Node<ACE_UINT32,ACE_RMCast::Data>;
template class ACE_RMCast_Copy_On_Write<ACE_UINT32,ACE_RMCast::Data,ACE_RMCast_Retransmission::Collection,ACE_RMCast_Retransmission::Collection_Iterator>;
+template class ACE_RMCast_Copy_On_Write_Container<ACE_RMCast_Retransmission::Collection,ACE_RMCast_Retransmission::Collection_Iterator>;
template class ACE_RMCast_Copy_On_Write_Write_Guard<ACE_RMCast_Retransmission::Collection,ACE_RMCast_Retransmission::Collection_Iterator>;
template class ACE_RMCast_Copy_On_Write_Read_Guard<ACE_RMCast_Retransmission::Collection,ACE_RMCast_Retransmission::Collection_Iterator>;
template class ACE_RMCast_Copy_On_Write_Collection<ACE_RMCast_Retransmission::Collection,ACE_RMCast_Retransmission::Collection_Iterator>;
diff --git a/protocols/ace/RMCast/Makefile b/protocols/ace/RMCast/Makefile
index e97cd885493..868795fa82c 100644
--- a/protocols/ace/RMCast/Makefile
+++ b/protocols/ace/RMCast/Makefile
@@ -18,6 +18,7 @@ FILES= \
RMCast_Proxy \
RMCast_Membership \
RMCast_Retransmission \
+ RMCast_Reordering \
\
RMCast_IO_UDP \
RMCast_UDP_Event_Handler \
@@ -710,6 +711,103 @@ include $(ACE_ROOT)/include/makeinclude/rules.local.GNU
$(ACE_ROOT)/ace/Message_Block_T.i \
$(ACE_ROOT)/ace/Message_Block_T.cpp
+.obj/RMCast_Reordering.o .obj/RMCast_Reordering.so .shobj/RMCast_Reordering.o .shobj/RMCast_Reordering.so: RMCast_Reordering.cpp RMCast_Reordering.h \
+ $(ACE_ROOT)/ace/pre.h \
+ RMCast_Module.h RMCast.h \
+ $(ACE_ROOT)/ace/OS.h \
+ $(ACE_ROOT)/ace/post.h \
+ $(ACE_ROOT)/ace/ACE_export.h \
+ $(ACE_ROOT)/ace/svc_export.h \
+ $(ACE_ROOT)/ace/ace_wchar.h \
+ $(ACE_ROOT)/ace/OS_Dirent.h \
+ $(ACE_ROOT)/ace/OS_Export.h \
+ $(ACE_ROOT)/ace/OS_Dirent.inl \
+ $(ACE_ROOT)/ace/OS_String.h \
+ $(ACE_ROOT)/ace/OS_String.inl \
+ $(ACE_ROOT)/ace/OS_Memory.h \
+ $(ACE_ROOT)/ace/OS_Memory.inl \
+ $(ACE_ROOT)/ace/OS_TLI.h \
+ $(ACE_ROOT)/ace/OS_TLI.inl \
+ $(ACE_ROOT)/ace/Min_Max.h \
+ $(ACE_ROOT)/ace/streams.h \
+ $(ACE_ROOT)/ace/Basic_Types.h \
+ $(ACE_ROOT)/ace/Basic_Types.i \
+ $(ACE_ROOT)/ace/Trace.h \
+ $(ACE_ROOT)/ace/OS.i \
+ RMCast_Export.h RMCast.i RMCast_Module.i RMCast_Copy_On_Write.h \
+ RMCast_Worker.h RMCast_Worker.i RMCast_Worker.cpp \
+ $(ACE_ROOT)/ace/Synch.h \
+ $(ACE_ROOT)/ace/ACE.h \
+ $(ACE_ROOT)/ace/ACE.i \
+ $(ACE_ROOT)/ace/Synch.i \
+ $(ACE_ROOT)/ace/Synch_T.h \
+ $(ACE_ROOT)/ace/Event_Handler.h \
+ $(ACE_ROOT)/ace/Event_Handler.i \
+ $(ACE_ROOT)/ace/Synch_T.i \
+ $(ACE_ROOT)/ace/Thread.h \
+ $(ACE_ROOT)/ace/Thread_Adapter.h \
+ $(ACE_ROOT)/ace/Base_Thread_Adapter.h \
+ $(ACE_ROOT)/ace/Base_Thread_Adapter.inl \
+ $(ACE_ROOT)/ace/Thread_Adapter.inl \
+ $(ACE_ROOT)/ace/Thread.i \
+ $(ACE_ROOT)/ace/Atomic_Op.i \
+ $(ACE_ROOT)/ace/Synch_T.cpp \
+ $(ACE_ROOT)/ace/Log_Msg.h \
+ $(ACE_ROOT)/ace/Log_Record.h \
+ $(ACE_ROOT)/ace/Log_Priority.h \
+ $(ACE_ROOT)/ace/Log_Record.i \
+ RMCast_Copy_On_Write.i RMCast_Copy_On_Write.cpp \
+ $(ACE_ROOT)/ace/RB_Tree.h \
+ $(ACE_ROOT)/ace/Functor.h \
+ $(ACE_ROOT)/ace/Functor.i \
+ $(ACE_ROOT)/ace/Functor_T.h \
+ $(ACE_ROOT)/ace/Functor_T.i \
+ $(ACE_ROOT)/ace/Functor_T.cpp \
+ $(ACE_ROOT)/ace/RB_Tree.i \
+ $(ACE_ROOT)/ace/Malloc.h \
+ $(ACE_ROOT)/ace/Malloc_Base.h \
+ $(ACE_ROOT)/ace/Based_Pointer_T.h \
+ $(ACE_ROOT)/ace/Based_Pointer_T.i \
+ $(ACE_ROOT)/ace/Based_Pointer_T.cpp \
+ $(ACE_ROOT)/ace/Based_Pointer_Repository.h \
+ $(ACE_ROOT)/ace/Singleton.h \
+ $(ACE_ROOT)/ace/Singleton.i \
+ $(ACE_ROOT)/ace/Singleton.cpp \
+ $(ACE_ROOT)/ace/Object_Manager.h \
+ $(ACE_ROOT)/ace/Object_Manager.i \
+ $(ACE_ROOT)/ace/Managed_Object.h \
+ $(ACE_ROOT)/ace/Managed_Object.i \
+ $(ACE_ROOT)/ace/Managed_Object.cpp \
+ $(ACE_ROOT)/ace/Malloc.i \
+ $(ACE_ROOT)/ace/Malloc_T.h \
+ $(ACE_ROOT)/ace/Free_List.h \
+ $(ACE_ROOT)/ace/Free_List.i \
+ $(ACE_ROOT)/ace/Free_List.cpp \
+ $(ACE_ROOT)/ace/Malloc_T.i \
+ $(ACE_ROOT)/ace/Malloc_T.cpp \
+ $(ACE_ROOT)/ace/Memory_Pool.h \
+ $(ACE_ROOT)/ace/Signal.h \
+ $(ACE_ROOT)/ace/Containers.h \
+ $(ACE_ROOT)/ace/Containers.i \
+ $(ACE_ROOT)/ace/Containers_T.h \
+ $(ACE_ROOT)/ace/Containers_T.i \
+ $(ACE_ROOT)/ace/Containers_T.cpp \
+ $(ACE_ROOT)/ace/Signal.i \
+ $(ACE_ROOT)/ace/Mem_Map.h \
+ $(ACE_ROOT)/ace/Mem_Map.i \
+ $(ACE_ROOT)/ace/SV_Semaphore_Complex.h \
+ $(ACE_ROOT)/ace/SV_Semaphore_Simple.h \
+ $(ACE_ROOT)/ace/SV_Semaphore_Simple.i \
+ $(ACE_ROOT)/ace/SV_Semaphore_Complex.i \
+ $(ACE_ROOT)/ace/Memory_Pool.i \
+ $(ACE_ROOT)/ace/RB_Tree.cpp \
+ RMCast_Reordering.i RMCast_Proxy.h RMCast_Proxy.i \
+ $(ACE_ROOT)/ace/Message_Block.h \
+ $(ACE_ROOT)/ace/Message_Block.i \
+ $(ACE_ROOT)/ace/Message_Block_T.h \
+ $(ACE_ROOT)/ace/Message_Block_T.i \
+ $(ACE_ROOT)/ace/Message_Block_T.cpp
+
.obj/RMCast_IO_UDP.o .obj/RMCast_IO_UDP.so .shobj/RMCast_IO_UDP.o .shobj/RMCast_IO_UDP.so: RMCast_IO_UDP.cpp RMCast_IO_UDP.h \
$(ACE_ROOT)/ace/pre.h \
RMCast_Module.h RMCast.h \
diff --git a/protocols/ace/RMCast/RMCast.h b/protocols/ace/RMCast/RMCast.h
index df3a0d48858..55e9b5d0368 100644
--- a/protocols/ace/RMCast/RMCast.h
+++ b/protocols/ace/RMCast/RMCast.h
@@ -207,7 +207,7 @@ public:
*/
struct Ack_Join
{
- ACE_INT32 next_sequence_number;
+ ACE_UINT32 next_sequence_number;
//! Pass the proxy source between layers
ACE_RMCast_Proxy *source;
diff --git a/protocols/ace/RMCast/RMCast_Copy_On_Write.cpp b/protocols/ace/RMCast/RMCast_Copy_On_Write.cpp
index f1553c7f4ab..20a76e559e0 100644
--- a/protocols/ace/RMCast/RMCast_Copy_On_Write.cpp
+++ b/protocols/ace/RMCast/RMCast_Copy_On_Write.cpp
@@ -39,11 +39,8 @@ ACE_RMCast_Copy_On_Write_Collection<COLLECTION,ITERATOR>::_decr_refcnt (void)
template<class KEY, class ITEM, class COLLECTION, class ITERATOR>
ACE_RMCast_Copy_On_Write<KEY,ITEM,COLLECTION,ITERATOR>::
ACE_RMCast_Copy_On_Write (void)
- : pending_writes_ (0)
- , writing_ (0)
- , cond_ (mutex_)
+ : ACE_RMCast_Copy_On_Write_Container<COLLECTION,ITERATOR> ()
{
- ACE_NEW (this->collection_, Collection);
}
template<class KEY, class ITEM, class COLLECTION, class ITERATOR>
@@ -69,8 +66,10 @@ ACE_RMCast_Copy_On_Write<KEY,ITEM,COLLECTION,ITERATOR>::
for (ITERATOR i = ace_mon.collection->collection.begin (); i != end; ++i)
{
int r = worker->work ((*i).key (), (*i).item ());
- if (r != 0)
- return r;
+ if (r == 1)
+ return 0; // Abort loop, but no error
+ if (r == -1)
+ return -1;
}
return 0;
}
@@ -79,11 +78,7 @@ template<class KEY, class ITEM, class C, class I> int
ACE_RMCast_Copy_On_Write<KEY,ITEM,C,I>::bind (KEY const & k,
ITEM const & i)
{
- Write_Guard ace_mon (this->mutex_,
- this->cond_,
- this->pending_writes_,
- this->writing_,
- this->collection_);
+ Write_Guard ace_mon (*this);
return this->bind_i (ace_mon, k, i);
}
@@ -91,19 +86,15 @@ ACE_RMCast_Copy_On_Write<KEY,ITEM,C,I>::bind (KEY const & k,
template<class KEY, class ITEM, class C, class I> int
ACE_RMCast_Copy_On_Write<KEY,ITEM,C,I>::unbind (KEY const & k)
{
- Write_Guard ace_mon (this->mutex_,
- this->cond_,
- this->pending_writes_,
- this->writing_,
- this->collection_);
+ Write_Guard ace_mon (*this);
return this->unbind_i (ace_mon, k);
}
template<class KEY, class ITEM, class C, class I> int
ACE_RMCast_Copy_On_Write<KEY,ITEM,C,I>::bind_i (Write_Guard &ace_mon,
- KEY const & k,
- ITEM const & i)
+ KEY const & k,
+ ITEM const & i)
{
return ace_mon.copy->collection.bind (k, i);
}
@@ -118,18 +109,25 @@ ACE_RMCast_Copy_On_Write<KEY,ITEM,C,I>::unbind_i (Write_Guard &ace_mon,
// ****************************************************************
template<class COLLECTION, class ITERATOR>
+ACE_RMCast_Copy_On_Write_Container<COLLECTION,ITERATOR>::ACE_RMCast_Copy_On_Write_Container (void)
+ : pending_writes_ (0)
+ , writing_ (0)
+ , cond_ (mutex_)
+{
+ ACE_NEW (this->collection_, Collection);
+}
+
+// ****************************************************************
+
+template<class COLLECTION, class ITERATOR>
ACE_RMCast_Copy_On_Write_Write_Guard<COLLECTION,ITERATOR>::
- ACE_RMCast_Copy_On_Write_Write_Guard (ACE_SYNCH_MUTEX &m,
- ACE_SYNCH_CONDITION &c,
- int &p,
- int &w,
- Collection*& cr)
+ ACE_RMCast_Copy_On_Write_Write_Guard (ACE_RMCast_Copy_On_Write_Container<COLLECTION,ITERATOR> &container)
: copy (0)
- , mutex (m)
- , cond (c)
- , pending_writes (p)
- , writing_flag (w)
- , collection (cr)
+ , mutex (container.mutex_)
+ , cond (container.cond_)
+ , pending_writes (container.pending_writes_)
+ , writing_flag (container.writing_)
+ , collection (container.collection_)
{
{
ACE_GUARD (ACE_SYNCH_MUTEX, ace_mon, this->mutex);
@@ -168,6 +166,8 @@ ACE_RMCast_Copy_On_Write_Write_Guard<COLLECTION,ITERATOR>::
this->cond.signal ();
}
// Delete outside the mutex, because it may take a long time.
+ // @@ Is this right? What happens if several readers are still
+ // using the old copy?
tmp->_decr_refcnt ();
}
diff --git a/protocols/ace/RMCast/RMCast_Copy_On_Write.h b/protocols/ace/RMCast/RMCast_Copy_On_Write.h
index 8724e23a5d5..e06aa5f1689 100644
--- a/protocols/ace/RMCast/RMCast_Copy_On_Write.h
+++ b/protocols/ace/RMCast/RMCast_Copy_On_Write.h
@@ -57,6 +57,47 @@ private:
// ****************************************************************
+template<class COLLECTION, class ITERATOR>
+class ACE_RMCast_Copy_On_Write_Write_Guard;
+
+//! Base class for the Copy_On_Write collection, used to simplify the
+//! declaration of the Write_Guard
+template<class COLLECTION, class ITERATOR>
+class ACE_RMCast_Copy_On_Write_Container
+{
+public:
+ //! Constructor
+ ACE_RMCast_Copy_On_Write_Container (void);
+
+ //! Let the Write_Guard access the internal fields.
+ friend ACE_RMCast_Copy_On_Write_Write_Guard<COLLECTION,ITERATOR>;
+
+ //! A shorter name for the actual collection type
+ typedef ACE_RMCast_Copy_On_Write_Collection<COLLECTION,ITERATOR> Collection;
+
+protected:
+ //! Number of pending writes
+ int pending_writes_;
+
+ //! If non-zero then a thread is changing the collection.
+ /*!
+ * Many threads can use the collection simulatenously, but only one
+ * change it.
+ */
+ int writing_;
+
+ //! A mutex to serialize access to the collection pointer.
+ ACE_SYNCH_MUTEX mutex_;
+
+ //! A condition variable to wait to synchronize multiple writers.
+ ACE_SYNCH_CONDITION cond_;
+
+ //! The collection, with reference counting added
+ Collection *collection_;
+};
+
+// ****************************************************************
+
//! Implement the write guard for a reference counted collecion
/*!
* This helper class atomically increments the reference count of a
@@ -70,11 +111,7 @@ public:
typedef ACE_RMCast_Copy_On_Write_Collection<COLLECTION,ITERATOR> Collection;
//! Constructor
- ACE_RMCast_Copy_On_Write_Write_Guard (ACE_SYNCH_MUTEX &mutex,
- ACE_SYNCH_CONDITION &cond,
- int &pending_writes,
- int &writing_flag,
- Collection*& collection);
+ ACE_RMCast_Copy_On_Write_Write_Guard (ACE_RMCast_Copy_On_Write_Container<COLLECTION,ITERATOR> &container);
//! Destructor
~ACE_RMCast_Copy_On_Write_Write_Guard (void);
@@ -100,11 +137,20 @@ private:
Collection *&collection;
};
+
// ****************************************************************
//! Implement a copy on write wrapper for a map-like collection
+/*
+ *
+ * <B>WARNING: </B> This class may be moved away in the future, I'm
+ * investigating how it could be converted into a reusable component
+ * in ACE. I won't make promises on when will that happen, but I
+ * won't promise that it will stay here either.
+ *
+ */
template<class KEY, class ITEM, class COLLECTION, class ITERATOR>
-class ACE_RMCast_Copy_On_Write
+class ACE_RMCast_Copy_On_Write : public ACE_RMCast_Copy_On_Write_Container<COLLECTION,ITERATOR>
{
public:
//! The Read_Guard trait
@@ -136,25 +182,6 @@ public:
//! Unbind assuming the Write_Guard is held
int unbind_i (Write_Guard &guard, KEY const & key);
-
- //! Number of pending writes
- int pending_writes_;
-
- //! If non-zero then a thread is changing the collection.
- /*!
- * Many threads can use the collection simulatenously, but only one
- * change it.
- */
- int writing_;
-
- //! A mutex to serialize access to the collection pointer.
- ACE_SYNCH_MUTEX mutex_;
-
- //! A condition variable to wait to synchronize multiple writers.
- ACE_SYNCH_CONDITION cond_;
-
- //! The collection, with reference counting added
- Collection *collection_;
};
#if defined (__ACE_INLINE__)
diff --git a/protocols/ace/RMCast/RMCast_Copy_On_Write.i b/protocols/ace/RMCast/RMCast_Copy_On_Write.i
index c6e5099cda5..354dd51bf5a 100644
--- a/protocols/ace/RMCast/RMCast_Copy_On_Write.i
+++ b/protocols/ace/RMCast/RMCast_Copy_On_Write.i
@@ -33,4 +33,3 @@ ACE_RMCast_Copy_On_Write_Read_Guard<COLLECTION,ITERATOR>::
}
// ****************************************************************
-
diff --git a/protocols/ace/RMCast/RMCast_Reordering.cpp b/protocols/ace/RMCast/RMCast_Reordering.cpp
new file mode 100644
index 00000000000..f63c6c66f34
--- /dev/null
+++ b/protocols/ace/RMCast/RMCast_Reordering.cpp
@@ -0,0 +1,153 @@
+//
+// $Id$
+//
+
+#include "RMCast_Reordering.h"
+#include "RMCast_Proxy.h"
+#include "ace/Message_Block.h"
+
+#if !defined (__ACE_INLINE__)
+# include "RMCast_Reordering.i"
+#endif /* ! __ACE_INLINE__ */
+
+ACE_RCSID(ace, RMCast_Reordering, "$Id$")
+
+ACE_RMCast_Reordering::~ACE_RMCast_Reordering (void)
+{
+}
+
+int
+ACE_RMCast_Reordering::close (void)
+{
+ // @@
+ return 0;
+}
+
+int
+ACE_RMCast_Reordering::data (ACE_RMCast::Data &data)
+{
+ int must_ack = 0;
+ ACE_RMCast::Ack ack;
+
+ // ACE_DEBUG ((LM_DEBUG, "Received message (%d)\n", data.sequence_number));
+ {
+ ACE_GUARD_RETURN (ACE_SYNCH_MUTEX, ace_mon, this->mutex_, -1);
+
+ if (data.sequence_number < this->next_expected_)
+ {
+ // Old message. Ack with the current status (look at the end
+ // of this block).
+ must_ack = 1;
+
+ // ACE_DEBUG ((LM_DEBUG, ".... old message is ignored\n"));
+ }
+
+ else if (data.sequence_number == this->next_expected_)
+ {
+ // ACE_DEBUG ((LM_DEBUG, ".... message is in order, received\n"));
+
+ // Accept the message, the current thread will dispatch it, so
+ // it is marked as accepted (using the <next_expected> field).
+ // Any other thread will not push that message because now it
+ // is "old".
+
+ this->next_expected_++;
+
+ // Right message, process as many messages as possible from
+ // the queue, then ack the right level...
+
+ // NOTE: we cannot release the mutex while dispatching
+ // events, otherwise: how do we stop other threads from
+ // delivering messages out of order? I.E. what if the
+ // next thread receives the next message?
+ if (this->next () != 0)
+ (void) this->next ()->data (data);
+
+ // After delivering one message there may be more messages
+ // pending
+ this->push_queued_messages ();
+
+ //@@ This should be strategized, for example, only Ack if
+ // there is a message out of order or something, otherwise
+ // continue with happiness. That works well for "optimistic
+ // models".
+ must_ack = 1;
+ }
+
+ else
+ {
+ // ACE_DEBUG ((LM_DEBUG, ".... message out of sequence, saved\n"));
+
+ // Out of sequence.
+ if (this->highest_received_ < data.sequence_number)
+ {
+ this->highest_received_ = data.sequence_number;
+ }
+ (void) this->messages_.bind (data.sequence_number, data);
+ // re-ack, otherwise save it and ack.
+ }
+
+ ack.next_expected = this->next_expected_;
+ ack.highest_received = this->highest_received_;
+ }
+
+ if (!must_ack || data.source == 0)
+ return 0;
+ return data.source->reply_ack (ack);
+}
+
+int
+ACE_RMCast_Reordering::ack_join (ACE_RMCast::Ack_Join &ack_join)
+{
+ {
+ ACE_GUARD_RETURN (ACE_SYNCH_MUTEX, ace_mon, this->mutex_, -1);
+ if (this->next_expected_ >= ack_join.next_sequence_number)
+ {
+ // Nothing to do in this case...
+ return 0;
+ }
+
+ Messages_Iterator i = this->messages_.begin ();
+ Messages_Iterator end = this->messages_.end ();
+
+ while (i != end
+ && (*i).key () < ack_join.next_sequence_number)
+ {
+ this->messages_.unbind ((*i).key ());
+ i = this->messages_.begin ();
+ }
+
+ this->next_expected_ = ack_join.next_sequence_number;
+ if (this->highest_received_ < ack_join.next_sequence_number)
+ this->highest_received_ = ack_join.next_sequence_number;
+
+ this->push_queued_messages ();
+ }
+
+ return 0;
+}
+
+void
+ACE_RMCast_Reordering::push_queued_messages (void)
+{
+ Messages_Iterator i = this->messages_.begin ();
+ Messages_Iterator end = this->messages_.end ();
+
+ while (i != end
+ && (*i).key () == this->next_expected_)
+ {
+ if (this->next () != 0)
+ {
+ ACE_RMCast::Data data = (*i).item ();
+ this->next ()->data (data);
+ }
+
+ this->messages_.unbind ((*i).key ());
+ i = this->messages_.begin ();
+ this->next_expected_++;
+ }
+}
+
+#if defined (ACE_HAS_EXPLICIT_TEMPLATE_INSTANTIATION)
+
+#endif /* ACE_HAS_EXPLICIT_TEMPLATE_INSTANTIATION */
diff --git a/protocols/ace/RMCast/RMCast_Reordering.h b/protocols/ace/RMCast/RMCast_Reordering.h
new file mode 100644
index 00000000000..0f6c777913c
--- /dev/null
+++ b/protocols/ace/RMCast/RMCast_Reordering.h
@@ -0,0 +1,94 @@
+/* -*- C++ -*- */
+// $Id$
+
+// ============================================================================
+//
+// = LIBRARY
+// ace/RMCast
+//
+// = AUTHOR
+// Carlos O'Ryan <coryan@uci.edu>
+//
+// ============================================================================
+
+#ifndef ACE_RMCAST_REORDERING_H
+#define ACE_RMCAST_REORDERING_H
+#include "ace/pre.h"
+
+#include "RMCast_Module.h"
+#include "ace/RB_Tree.h"
+#include "ace/Synch.h"
+
+#if !defined (ACE_LACKS_PRAGMA_ONCE)
+# pragma once
+#endif /* ACE_LACKS_PRAGMA_ONCE */
+
+class ACE_RMCast_Proxy;
+
+//! Pass messages up in sent order
+/*!
+ * Some applications require receivers to process messages in the same
+ * order that messages are sent. This module buffers out of order
+ * messages and only delivers a message if:
+ * - All the previous messages have been delivered.
+ * - The sender sends a notification that previous messages will not
+ * be resent.
+ *
+ * The module also sends the Ack feedback to the sender.
+ *
+ * NOTE: This is not the same as causal or total ordering, that could
+ * be implemented someday, but requires a lot more than what we have
+ * right now.
+ *
+ */
+class ACE_RMCast_Export ACE_RMCast_Reordering : public ACE_RMCast_Module
+{
+public:
+ //! Constructor
+ ACE_RMCast_Reordering (void);
+
+ //! Destructor
+ virtual ~ACE_RMCast_Reordering (void);
+
+ //! Use a Red-Black Tree to keep the queue of messages
+ typedef ACE_RB_Tree<ACE_UINT32,ACE_RMCast::Data,ACE_Less_Than<ACE_UINT32>,ACE_Null_Mutex> Messages;
+ typedef ACE_RB_Tree_Iterator<ACE_UINT32,ACE_RMCast::Data,ACE_Less_Than<ACE_UINT32>,ACE_Null_Mutex> Messages_Iterator;
+
+ //! Remove messages still pending
+ virtual int close (void);
+
+ //! Process a Data message.
+ /*!
+ * Process a Data message, sending the right Ack message back.
+ * The message is passed up only if it is in order.
+ */
+ virtual int data (ACE_RMCast::Data &);
+
+ //! During the join process the server informs us of the next
+ //! expected message
+ virtual int ack_join (ACE_RMCast::Ack_Join &);
+
+private:
+ //! Push any messages that are pending in the queue
+ void push_queued_messages (void);
+
+protected:
+ //! The reordering buffer
+ Messages messages_;
+
+ //! The smallest value of \param next_expected for all the proxies
+ ACE_UINT32 next_expected_;
+
+ //! The highest value of \param highest_received for all the proxies
+ ACE_UINT32 highest_received_;
+
+ //! Synchronization
+ ACE_SYNCH_MUTEX mutex_;
+};
+
+#if defined (__ACE_INLINE__)
+#include "RMCast_Reordering.i"
+#endif /* __ACE_INLINE__ */
+
+#include "ace/post.h"
+#endif /* ACE_RMCAST_REORDERING_H */
diff --git a/protocols/ace/RMCast/RMCast_Reordering.i b/protocols/ace/RMCast/RMCast_Reordering.i
new file mode 100644
index 00000000000..ccbf852bc67
--- /dev/null
+++ b/protocols/ace/RMCast/RMCast_Reordering.i
@@ -0,0 +1,8 @@
+// $Id$
+
+ACE_INLINE
+ACE_RMCast_Reordering::ACE_RMCast_Reordering (void)
+ : next_expected_ (0)
+ , highest_received_ (0)
+{
+}
diff --git a/protocols/ace/RMCast/RMCast_Retransmission.cpp b/protocols/ace/RMCast/RMCast_Retransmission.cpp
index 7e38cdf7c97..4d40a59ac5e 100644
--- a/protocols/ace/RMCast/RMCast_Retransmission.cpp
+++ b/protocols/ace/RMCast/RMCast_Retransmission.cpp
@@ -33,9 +33,9 @@ public:
{
if (key > this->max_sequence_number_)
return 0;
- ACE_DEBUG ((LM_DEBUG,
- " Retransmission::resend - message %d resent\n",
- key));
+ // ACE_DEBUG ((LM_DEBUG,
+ // " Retransmission::resend - message %d resent\n",
+ // key));
ACE_RMCast::Data data = item;
int r = this->next_->data (data);
if (r != 0)
@@ -138,9 +138,9 @@ public:
{
if (key >= this->ack_.next_expected)
return 0;
- ACE_DEBUG ((LM_DEBUG,
- " Retransmission::ack - message %d erased\n",
- key));
+ // ACE_DEBUG ((LM_DEBUG,
+ // " Retransmission::ack - message %d erased\n",
+ // key));
return this->messages_->unbind_i (this->ace_mon_, key);
}
@@ -159,11 +159,7 @@ private:
int
ACE_RMCast_Retransmission::ack (ACE_RMCast::Ack &ack)
{
- Messages::Write_Guard ace_mon (this->messages_.mutex_,
- this->messages_.cond_,
- this->messages_.pending_writes_,
- this->messages_.writing_,
- this->messages_.collection_);
+ Messages::Write_Guard ace_mon (this->messages_);
ACE_RMCast_Ack_Worker worker (ack, ace_mon, &this->messages_);
@@ -179,6 +175,7 @@ template class ACE_RB_Tree_Reverse_Iterator<ACE_UINT32,ACE_RMCast::Data,ACE_Less
template class ACE_RB_Tree_Node<ACE_UINT32,ACE_RMCast::Data>;
template class ACE_RMCast_Copy_On_Write<ACE_UINT32,ACE_RMCast::Data,ACE_RMCast_Retransmission::Collection,ACE_RMCast_Retransmission::Collection_Iterator>;
+template class ACE_RMCast_Copy_On_Write_Container<ACE_RMCast_Retransmission::Collection,ACE_RMCast_Retransmission::Collection_Iterator>;
template class ACE_RMCast_Copy_On_Write_Write_Guard<ACE_RMCast_Retransmission::Collection,ACE_RMCast_Retransmission::Collection_Iterator>;
template class ACE_RMCast_Copy_On_Write_Read_Guard<ACE_RMCast_Retransmission::Collection,ACE_RMCast_Retransmission::Collection_Iterator>;
template class ACE_RMCast_Copy_On_Write_Collection<ACE_RMCast_Retransmission::Collection,ACE_RMCast_Retransmission::Collection_Iterator>;
diff --git a/tests/RMCast/Makefile b/tests/RMCast/Makefile
index 0741c7df36f..b6a6ab8b550 100644
--- a/tests/RMCast/Makefile
+++ b/tests/RMCast/Makefile
@@ -12,7 +12,8 @@ BIN = RMCast_Fragment_Test \
RMCast_Reassembly_Test \
RMCast_UDP_Best_Effort_Test \
RMCast_Membership_Test \
- RMCast_Retransmission_Test
+ RMCast_Retransmission_Test \
+ RMCast_Reordering_Test
PSRC=$(addsuffix .cpp,$(BIN))
LDLIBS = -lACE_RMCast
@@ -838,4 +839,166 @@ endif
$(ACE_ROOT)/ace/Stream_Modules.h \
$(ACE_ROOT)/ace/Stream_Modules.cpp
+.obj/RMCast_Reordering_Test.o .obj/RMCast_Reordering_Test.so .shobj/RMCast_Reordering_Test.o .shobj/RMCast_Reordering_Test.so: RMCast_Reordering_Test.cpp ../test_config.h \
+ $(ACE_ROOT)/ace/pre.h \
+ $(ACE_ROOT)/ace/post.h \
+ $(ACE_ROOT)/ace/ACE_export.h \
+ $(ACE_ROOT)/ace/svc_export.h \
+ $(ACE_ROOT)/ace/ace_wchar.h \
+ $(ACE_ROOT)/ace/OS.h \
+ $(ACE_ROOT)/ace/OS_Dirent.h \
+ $(ACE_ROOT)/ace/OS_Export.h \
+ $(ACE_ROOT)/ace/OS_Dirent.inl \
+ $(ACE_ROOT)/ace/OS_String.h \
+ $(ACE_ROOT)/ace/OS_String.inl \
+ $(ACE_ROOT)/ace/OS_Memory.h \
+ $(ACE_ROOT)/ace/OS_Memory.inl \
+ $(ACE_ROOT)/ace/OS_TLI.h \
+ $(ACE_ROOT)/ace/OS_TLI.inl \
+ $(ACE_ROOT)/ace/Min_Max.h \
+ $(ACE_ROOT)/ace/streams.h \
+ $(ACE_ROOT)/ace/Basic_Types.h \
+ $(ACE_ROOT)/ace/Basic_Types.i \
+ $(ACE_ROOT)/ace/Trace.h \
+ $(ACE_ROOT)/ace/OS.i \
+ $(ACE_ROOT)/ace/Singleton.h \
+ $(ACE_ROOT)/ace/Synch.h \
+ $(ACE_ROOT)/ace/ACE.h \
+ $(ACE_ROOT)/ace/ACE.i \
+ $(ACE_ROOT)/ace/Synch.i \
+ $(ACE_ROOT)/ace/Synch_T.h \
+ $(ACE_ROOT)/ace/Event_Handler.h \
+ $(ACE_ROOT)/ace/Event_Handler.i \
+ $(ACE_ROOT)/ace/Synch_T.i \
+ $(ACE_ROOT)/ace/Thread.h \
+ $(ACE_ROOT)/ace/Thread_Adapter.h \
+ $(ACE_ROOT)/ace/Base_Thread_Adapter.h \
+ $(ACE_ROOT)/ace/Base_Thread_Adapter.inl \
+ $(ACE_ROOT)/ace/Thread_Adapter.inl \
+ $(ACE_ROOT)/ace/Thread.i \
+ $(ACE_ROOT)/ace/Atomic_Op.i \
+ $(ACE_ROOT)/ace/Synch_T.cpp \
+ $(ACE_ROOT)/ace/Log_Msg.h \
+ $(ACE_ROOT)/ace/Log_Record.h \
+ $(ACE_ROOT)/ace/Log_Priority.h \
+ $(ACE_ROOT)/ace/Log_Record.i \
+ $(ACE_ROOT)/ace/Singleton.i \
+ $(ACE_ROOT)/ace/Singleton.cpp \
+ $(ACE_ROOT)/ace/Object_Manager.h \
+ $(ACE_ROOT)/ace/Object_Manager.i \
+ $(ACE_ROOT)/ace/Managed_Object.h \
+ $(ACE_ROOT)/ace/Managed_Object.i \
+ $(ACE_ROOT)/ace/Managed_Object.cpp \
+ $(ACE_ROOT)/ace/RMCast/RMCast_Proxy.h \
+ $(ACE_ROOT)/ace/RMCast/RMCast_Module.h \
+ $(ACE_ROOT)/ace/RMCast/RMCast.h \
+ $(ACE_ROOT)/ace/RMCast/RMCast_Export.h \
+ $(ACE_ROOT)/ace/RMCast/RMCast.i \
+ $(ACE_ROOT)/ace/RMCast/RMCast_Module.i \
+ $(ACE_ROOT)/ace/RMCast/RMCast_Proxy.i \
+ $(ACE_ROOT)/ace/RMCast/RMCast_Reordering.h \
+ $(ACE_ROOT)/ace/RMCast/RMCast_Copy_On_Write.h \
+ $(ACE_ROOT)/ace/RMCast/RMCast_Worker.h \
+ $(ACE_ROOT)/ace/RMCast/RMCast_Worker.i \
+ $(ACE_ROOT)/ace/RMCast/RMCast_Worker.cpp \
+ $(ACE_ROOT)/ace/RMCast/RMCast_Copy_On_Write.i \
+ $(ACE_ROOT)/ace/RMCast/RMCast_Copy_On_Write.cpp \
+ $(ACE_ROOT)/ace/RB_Tree.h \
+ $(ACE_ROOT)/ace/Functor.h \
+ $(ACE_ROOT)/ace/Functor.i \
+ $(ACE_ROOT)/ace/Functor_T.h \
+ $(ACE_ROOT)/ace/Functor_T.i \
+ $(ACE_ROOT)/ace/Functor_T.cpp \
+ $(ACE_ROOT)/ace/RB_Tree.i \
+ $(ACE_ROOT)/ace/Malloc.h \
+ $(ACE_ROOT)/ace/Malloc_Base.h \
+ $(ACE_ROOT)/ace/Based_Pointer_T.h \
+ $(ACE_ROOT)/ace/Based_Pointer_T.i \
+ $(ACE_ROOT)/ace/Based_Pointer_T.cpp \
+ $(ACE_ROOT)/ace/Based_Pointer_Repository.h \
+ $(ACE_ROOT)/ace/Malloc.i \
+ $(ACE_ROOT)/ace/Malloc_T.h \
+ $(ACE_ROOT)/ace/Free_List.h \
+ $(ACE_ROOT)/ace/Free_List.i \
+ $(ACE_ROOT)/ace/Free_List.cpp \
+ $(ACE_ROOT)/ace/Malloc_T.i \
+ $(ACE_ROOT)/ace/Malloc_T.cpp \
+ $(ACE_ROOT)/ace/Memory_Pool.h \
+ $(ACE_ROOT)/ace/Signal.h \
+ $(ACE_ROOT)/ace/Containers.h \
+ $(ACE_ROOT)/ace/Containers.i \
+ $(ACE_ROOT)/ace/Containers_T.h \
+ $(ACE_ROOT)/ace/Containers_T.i \
+ $(ACE_ROOT)/ace/Containers_T.cpp \
+ $(ACE_ROOT)/ace/Signal.i \
+ $(ACE_ROOT)/ace/Mem_Map.h \
+ $(ACE_ROOT)/ace/Mem_Map.i \
+ $(ACE_ROOT)/ace/SV_Semaphore_Complex.h \
+ $(ACE_ROOT)/ace/SV_Semaphore_Simple.h \
+ $(ACE_ROOT)/ace/SV_Semaphore_Simple.i \
+ $(ACE_ROOT)/ace/SV_Semaphore_Complex.i \
+ $(ACE_ROOT)/ace/Memory_Pool.i \
+ $(ACE_ROOT)/ace/RB_Tree.cpp \
+ $(ACE_ROOT)/ace/RMCast/RMCast_Reordering.i \
+ $(ACE_ROOT)/ace/Task.h \
+ $(ACE_ROOT)/ace/Service_Object.h \
+ $(ACE_ROOT)/ace/Shared_Object.h \
+ $(ACE_ROOT)/ace/Shared_Object.i \
+ $(ACE_ROOT)/ace/Service_Object.i \
+ $(ACE_ROOT)/ace/Thread_Manager.h \
+ $(ACE_ROOT)/ace/Thread_Manager.i \
+ $(ACE_ROOT)/ace/Task.i \
+ $(ACE_ROOT)/ace/Task_T.h \
+ $(ACE_ROOT)/ace/Message_Queue.h \
+ $(ACE_ROOT)/ace/Message_Block.h \
+ $(ACE_ROOT)/ace/Message_Block.i \
+ $(ACE_ROOT)/ace/Message_Block_T.h \
+ $(ACE_ROOT)/ace/Message_Block_T.i \
+ $(ACE_ROOT)/ace/Message_Block_T.cpp \
+ $(ACE_ROOT)/ace/IO_Cntl_Msg.h \
+ $(ACE_ROOT)/ace/Message_Queue_T.h \
+ $(ACE_ROOT)/ace/Message_Queue_T.i \
+ $(ACE_ROOT)/ace/Message_Queue_T.cpp \
+ $(ACE_ROOT)/ace/Strategies.h \
+ $(ACE_ROOT)/ace/Strategies_T.h \
+ $(ACE_ROOT)/ace/Service_Config.h \
+ $(ACE_ROOT)/ace/SString.h \
+ $(ACE_ROOT)/ace/SString.i \
+ $(ACE_ROOT)/ace/Service_Config.i \
+ $(ACE_ROOT)/ace/Reactor.h \
+ $(ACE_ROOT)/ace/Handle_Set.h \
+ $(ACE_ROOT)/ace/Handle_Set.i \
+ $(ACE_ROOT)/ace/Timer_Queue.h \
+ $(ACE_ROOT)/ace/Timer_Queue_T.h \
+ $(ACE_ROOT)/ace/Timer_Queue_T.i \
+ $(ACE_ROOT)/ace/Timer_Queue_T.cpp \
+ $(ACE_ROOT)/ace/Reactor.i \
+ $(ACE_ROOT)/ace/Reactor_Impl.h \
+ $(ACE_ROOT)/ace/Svc_Conf_Tokens.h \
+ $(ACE_ROOT)/ace/Synch_Options.h \
+ $(ACE_ROOT)/ace/Synch_Options.i \
+ $(ACE_ROOT)/ace/Hash_Map_Manager.h \
+ $(ACE_ROOT)/ace/Hash_Map_Manager_T.h \
+ $(ACE_ROOT)/ace/Hash_Map_Manager_T.i \
+ $(ACE_ROOT)/ace/Hash_Map_Manager_T.cpp \
+ $(ACE_ROOT)/ace/Strategies_T.i \
+ $(ACE_ROOT)/ace/Strategies_T.cpp \
+ $(ACE_ROOT)/ace/Service_Repository.h \
+ $(ACE_ROOT)/ace/Service_Types.h \
+ $(ACE_ROOT)/ace/Service_Types.i \
+ $(ACE_ROOT)/ace/Service_Repository.i \
+ $(ACE_ROOT)/ace/WFMO_Reactor.h \
+ $(ACE_ROOT)/ace/Process_Mutex.h \
+ $(ACE_ROOT)/ace/Process_Mutex.inl \
+ $(ACE_ROOT)/ace/WFMO_Reactor.i \
+ $(ACE_ROOT)/ace/Strategies.i \
+ $(ACE_ROOT)/ace/Message_Queue.i \
+ $(ACE_ROOT)/ace/Task_T.i \
+ $(ACE_ROOT)/ace/Task_T.cpp \
+ $(ACE_ROOT)/ace/Module.h \
+ $(ACE_ROOT)/ace/Module.i \
+ $(ACE_ROOT)/ace/Module.cpp \
+ $(ACE_ROOT)/ace/Stream_Modules.h \
+ $(ACE_ROOT)/ace/Stream_Modules.cpp
+
# IF YOU PUT ANYTHING HERE IT WILL GO AWAY
diff --git a/tests/RMCast/RMCast_Reordering_Test.cpp b/tests/RMCast/RMCast_Reordering_Test.cpp
new file mode 100644
index 00000000000..93b1c856826
--- /dev/null
+++ b/tests/RMCast/RMCast_Reordering_Test.cpp
@@ -0,0 +1,313 @@
+// $Id$
+
+#include "test_config.h"
+#include "ace/RMCast/RMCast_Proxy.h"
+#include "ace/RMCast/RMCast_Reordering.h"
+
+#include "ace/Task.h"
+
+ACE_RCSID(tests, RMCast_Reordering_Test, "$Id$")
+
+// ****************************************************************
+
+class Tester;
+
+//! Simple proxy for the ACE_RMCast_Reordering test harness
+/*!
+ * Implement a simple version of the ACE_RMCast_Proxy class used in
+ * the ACE_RMCast_Reordering test harness.
+ */
+class Test_Proxy : public ACE_RMCast_Proxy
+{
+public:
+ Test_Proxy (void);
+
+ void set_tester (Tester *tester)
+ {
+ this->tester_ = tester;
+ }
+
+ //! Most of the reply_ methods just return 0, there is no real remote
+ //! peer, this is just a test harness
+ //@{
+ virtual int reply_data (ACE_RMCast::Data &)
+ {
+ return 0;
+ }
+ virtual int reply_poll (ACE_RMCast::Poll &)
+ {
+ return 0;
+ }
+ virtual int reply_ack_join (ACE_RMCast::Ack_Join &)
+ {
+ return 0;
+ }
+ virtual int reply_ack_leave (ACE_RMCast::Ack_Leave &)
+ {
+ return 0;
+ }
+ //! Must check that that sequence number is reasonable
+ virtual int reply_ack (ACE_RMCast::Ack &);
+ virtual int reply_join (ACE_RMCast::Join &)
+ {
+ return 0;
+ }
+ virtual int reply_leave (ACE_RMCast::Leave &)
+ {
+ return 0;
+ }
+ //@}
+
+private:
+ //! Keep a reference to the main testing class so it can be called
+ //! back.
+ Tester *tester_;
+};
+
+// ****************************************************************
+
+//! A simple module to receive the messages from ACE_RMCast_Reordering
+/*!
+ * The ACE_RMCast_Reordering layer pushes messages to its next module
+ * when all the members have acked a message, when a new member joins,
+ * when a member leaves, etc.
+ * This class will verify that the messages are exactly what we
+ * expect.
+ */
+class Tester : public ACE_RMCast_Module
+{
+public:
+ Tester (void);
+
+ //! Run the test for \iterations times
+ void run (int iterations);
+
+ //! The proxy has received an Ack message, we need to validate it
+ int reply_ack (ACE_RMCast::Ack &ack);
+
+ //! Receive the data messages and validate their order
+ virtual int data (ACE_RMCast::Data &data);
+
+private:
+ //! Generate a new message to drive the test
+ void generate_messages (int count);
+
+private:
+ //! The array of proxies
+ Test_Proxy proxy_;
+
+ //! The Reordering layer
+ ACE_RMCast_Reordering reordering_;
+
+ //! The test is randomized to get better coverage. This is the seed
+ //! variable for the test
+ ACE_RANDR_TYPE seed_;
+
+ //! The lowest sequence number used to generate messages
+ ACE_UINT32 lowest_sequence_number_;
+
+ //! The next expected sequence number
+ ACE_UINT32 next_expected_;
+
+ //! Synchronization
+ ACE_SYNCH_MUTEX mutex_;
+};
+
+// ****************************************************************
+
+//! An Adapter to run Tester::run the test is a separate thread
+class Task : public ACE_Task_Base
+{
+public:
+ Task (Tester *tester);
+
+ // = Read the documentation in "ace/Task.h"
+ int svc (void);
+
+private:
+ //! The tester object.
+ Tester *tester_;
+};
+
+// ****************************************************************
+
+int
+main (int, ACE_TCHAR *[])
+{
+ ACE_START_TEST (ACE_TEXT ("RMCast_Reordering_Test"));
+
+ ACE_DEBUG ((LM_DEBUG,
+ ACE_TEXT ("This is ACE Version %u.%u.%u\n\n"),
+ ACE::major_version(),
+ ACE::minor_version(),
+ ACE::beta_version()));
+
+ {
+ ACE_DEBUG ((LM_DEBUG, "Running single threaded test\n"));
+ //! Run the test in single threaded mode
+ Tester tester;
+ tester.run (100);
+ }
+ {
+ ACE_DEBUG ((LM_DEBUG, "Running multi threaded test\n"));
+ //! Run the test in multi-threaded mode
+ Tester tester;
+ Task task (&tester);
+ if (task.activate (THR_NEW_LWP|THR_JOINABLE, 4) == -1)
+ ACE_ERROR_RETURN ((LM_ERROR, "Cannot activate the threads\n"), 1);
+ ACE_Thread_Manager::instance ()->wait ();
+ }
+
+ ACE_END_TEST;
+ return 0;
+}
+
+// ****************************************************************
+
+Tester::Tester (void)
+ : seed_ (ACE_static_cast(ACE_RANDR_TYPE,ACE_OS::gethrtime ()))
+ , lowest_sequence_number_ (0)
+ , next_expected_ (0)
+{
+ // Initialize the stack...
+ this->reordering_.next (this);
+
+ this->proxy_.set_tester (this);
+}
+
+void
+Tester::run (int iterations)
+{
+ ACE_RMCast::Ack_Join ack_join;
+ {
+ ACE_GUARD (ACE_SYNCH_MUTEX, ace_mon, this->mutex_);
+ ack_join.source = &this->proxy_;
+ ack_join.next_sequence_number = this->lowest_sequence_number_;
+ }
+ int result = this->reordering_.ack_join (ack_join);
+ if (result != 0)
+ {
+ ACE_ERROR ((LM_ERROR,
+ "Reordering::ack_join returned %d\n",
+ result));
+ }
+
+ for (int i = 0; i < iterations; ++i)
+ {
+ // Push data
+ this->generate_messages (iterations / 10);
+ }
+ if (this->next_expected_ == 0)
+ {
+ ACE_ERROR ((LM_ERROR,
+ "Tester::run - no messages received\n"));
+ }
+ else
+ {
+ ACE_DEBUG ((LM_DEBUG, "Tester::run (%t) - %d messages received\n",
+ this->next_expected_));
+ }
+}
+
+int
+Tester::reply_ack (ACE_RMCast::Ack &ack)
+{
+ ACE_GUARD_RETURN (ACE_SYNCH_MUTEX, ace_mon, this->mutex_, -1);
+
+ // ACE_DEBUG ((LM_DEBUG, "Received Ack message (%d, %d)\n",
+ // ack.next_expected, this->lowest_sequence_number_));
+
+ const int success_ratio = 95;
+ int c = ACE_OS::rand_r (this->seed_) % 100;
+
+ if (c < success_ratio)
+ {
+ this->lowest_sequence_number_ = ack.next_expected;
+ }
+ return 0;
+}
+
+void
+Tester::generate_messages (int count)
+{
+ ACE_Message_Block payload (1024);
+ payload.wr_ptr (1024);
+
+ ACE_RMCast::Data data;
+ data.source = &this->proxy_;
+ data.payload = &payload;
+
+ ACE_UINT32 min_sn;
+ ACE_UINT32 max_sn;
+ {
+ ACE_GUARD (ACE_SYNCH_MUTEX, ace_mon, this->mutex_);
+ min_sn = this->lowest_sequence_number_;
+ max_sn = this->lowest_sequence_number_ + count;
+ }
+ for (ACE_UINT32 i = min_sn; i != max_sn; ++i)
+ {
+ data.sequence_number = i;
+
+ const int success_ratio = 95;
+ int c;
+ {
+ ACE_GUARD (ACE_SYNCH_MUTEX, ace_mon, this->mutex_);
+ c = ACE_OS::rand_r (this->seed_) % 100;
+ }
+ if (c < success_ratio)
+ {
+ int result = this->reordering_.data (data);
+ if (result != 0)
+ {
+ ACE_ERROR ((LM_ERROR,
+ "Reordering::data returned %d\n",
+ result));
+ }
+ }
+ }
+}
+
+int
+Tester::data (ACE_RMCast::Data &data)
+{
+ ACE_GUARD_RETURN (ACE_SYNCH_MUTEX, ace_mon, this->mutex_, -1);
+
+ if (this->next_expected_ != data.sequence_number)
+ {
+ ACE_ERROR ((LM_ERROR,
+ "Tester::data - out of sequence message (%d != %d)\n",
+ this->next_expected_,
+ data.sequence_number));
+ return -1;
+ }
+
+ this->next_expected_++;
+ return 0;
+}
+
+// ****************************************************************
+
+Task::Task (Tester *tester)
+ : tester_ (tester)
+{
+}
+
+int
+Task::svc (void)
+{
+ this->tester_->run (100);
+ return 0;
+}
+
+// ****************************************************************
+
+Test_Proxy::Test_Proxy (void)
+ : tester_ (0)
+{
+}
+
+int
+Test_Proxy::reply_ack (ACE_RMCast::Ack & ack)
+{
+ return this->tester_->reply_ack (ack);
+}
diff --git a/tests/RMCast/RMCast_Reordering_Test.dsp b/tests/RMCast/RMCast_Reordering_Test.dsp
new file mode 100644
index 00000000000..f3f5f32a9b8
--- /dev/null
+++ b/tests/RMCast/RMCast_Reordering_Test.dsp
@@ -0,0 +1,96 @@
+# Microsoft Developer Studio Project File - Name="RMCast_Reordering_Test" - Package Owner=<4>
+# Microsoft Developer Studio Generated Build File, Format Version 6.00
+# ** DO NOT EDIT **
+
+# TARGTYPE "Win32 (x86) Console Application" 0x0103
+
+CFG=RMCast_Reordering_Test - Win32 Debug
+!MESSAGE This is not a valid makefile. To build this project using NMAKE,
+!MESSAGE use the Export Makefile command and run
+!MESSAGE
+!MESSAGE NMAKE /f "RMCast_Reordering_Test.mak".
+!MESSAGE
+!MESSAGE You can specify a configuration when running NMAKE
+!MESSAGE by defining the macro CFG on the command line. For example:
+!MESSAGE
+!MESSAGE NMAKE /f "RMCast_Reordering_Test.mak" CFG="RMCast_Reordering_Test - Win32 Debug"
+!MESSAGE
+!MESSAGE Possible choices for configuration are:
+!MESSAGE
+!MESSAGE "RMCast_Reordering_Test - Win32 Release" (based on "Win32 (x86) Console Application")
+!MESSAGE "RMCast_Reordering_Test - Win32 Debug" (based on "Win32 (x86) Console Application")
+!MESSAGE
+
+# Begin Project
+# PROP AllowPerConfigDependencies 0
+# PROP Scc_ProjName ""
+# PROP Scc_LocalPath ""
+CPP=cl.exe
+RSC=rc.exe
+
+!IF "$(CFG)" == "RMCast_Reordering_Test - Win32 Release"
+
+# PROP BASE Use_MFC 0
+# PROP BASE Use_Debug_Libraries 0
+# PROP BASE Output_Dir "Release"
+# PROP BASE Intermediate_Dir "Release"
+# PROP BASE Target_Dir ""
+# PROP Use_MFC 0
+# PROP Use_Debug_Libraries 0
+# PROP Output_Dir "Release"
+# PROP Intermediate_Dir "Release"
+# PROP Ignore_Export_Lib 0
+# PROP Target_Dir ""
+# ADD BASE CPP /nologo /W3 /GX /O2 /D "WIN32" /D "NDEBUG" /D "_CONSOLE" /D "_MBCS" /YX /FD /c
+# ADD CPP /nologo /MD /W3 /GX /O2 /I "..\.." /I ".." /D "WIN32" /D "NDEBUG" /D "_CONSOLE" /D "_MBCS" /FD /c
+# SUBTRACT CPP /YX
+# ADD BASE RSC /l 0x409 /d "NDEBUG"
+# ADD RSC /l 0x409 /d "NDEBUG"
+BSC32=bscmake.exe
+# ADD BASE BSC32 /nologo
+# ADD BSC32 /nologo
+LINK32=link.exe
+# ADD BASE LINK32 kernel32.lib user32.lib gdi32.lib winspool.lib comdlg32.lib advapi32.lib shell32.lib ole32.lib oleaut32.lib uuid.lib odbc32.lib odbccp32.lib kernel32.lib user32.lib gdi32.lib winspool.lib comdlg32.lib advapi32.lib shell32.lib ole32.lib oleaut32.lib uuid.lib odbc32.lib odbccp32.lib /nologo /subsystem:console /machine:I386
+# ADD LINK32 ace.lib ACE_RMCast.lib /nologo /subsystem:console /machine:I386 /libpath:"..\..\ace" /libpath:"..\..\ace\RMCast"
+
+!ELSEIF "$(CFG)" == "RMCast_Reordering_Test - Win32 Debug"
+
+# PROP BASE Use_MFC 0
+# PROP BASE Use_Debug_Libraries 1
+# PROP BASE Output_Dir "Debug"
+# PROP BASE Intermediate_Dir "Debug"
+# PROP BASE Target_Dir ""
+# PROP Use_MFC 0
+# PROP Use_Debug_Libraries 1
+# PROP Output_Dir "Debug"
+# PROP Intermediate_Dir "Debug"
+# PROP Ignore_Export_Lib 0
+# PROP Target_Dir ""
+# ADD BASE CPP /nologo /W3 /Gm /GX /Zi /Od /D "WIN32" /D "_DEBUG" /D "_CONSOLE" /D "_MBCS" /YX /FD /c
+# ADD CPP /nologo /MDd /W3 /Gm /GX /Zi /Od /I "..\.." /I ".." /D "WIN32" /D "_DEBUG" /D "_CONSOLE" /D "_MBCS" /FD /c
+# SUBTRACT CPP /YX
+# ADD BASE RSC /l 0x409 /d "_DEBUG"
+# ADD RSC /l 0x409 /d "_DEBUG"
+BSC32=bscmake.exe
+# ADD BASE BSC32 /nologo
+# ADD BSC32 /nologo
+LINK32=link.exe
+# ADD BASE LINK32 kernel32.lib user32.lib gdi32.lib winspool.lib comdlg32.lib advapi32.lib shell32.lib ole32.lib oleaut32.lib uuid.lib odbc32.lib odbccp32.lib kernel32.lib user32.lib gdi32.lib winspool.lib comdlg32.lib advapi32.lib shell32.lib ole32.lib oleaut32.lib uuid.lib odbc32.lib odbccp32.lib /nologo /subsystem:console /debug /machine:I386 /pdbtype:sept
+# ADD LINK32 ACE_RMCastd.lib aced.lib /nologo /subsystem:console /debug /machine:I386 /out:"RMCast_Reordering_Test.exe" /pdbtype:sept /libpath:"..\..\ace" /libpath:"..\..\ace\RMCast"
+
+!ENDIF
+
+# Begin Target
+
+# Name "RMCast_Reordering_Test - Win32 Release"
+# Name "RMCast_Reordering_Test - Win32 Debug"
+# Begin Group "Source Files"
+
+# PROP Default_Filter "cpp;c;cxx;rc;def;r;odl;idl;hpj;bat"
+# Begin Source File
+
+SOURCE=.\RMCast_Reordering_Test.cpp
+# End Source File
+# End Group
+# End Target
+# End Project
diff --git a/tests/RMCast/RMCast_Retransmission_Test.cpp b/tests/RMCast/RMCast_Retransmission_Test.cpp
index a40c0fd4177..90adb267539 100644
--- a/tests/RMCast/RMCast_Retransmission_Test.cpp
+++ b/tests/RMCast/RMCast_Retransmission_Test.cpp
@@ -209,7 +209,7 @@ main (int, ACE_TCHAR *[])
// ****************************************************************
Tester::Tester (void)
- : seed_ (ACE_OS::gethrtime ())
+ : seed_ (ACE_static_cast(ACE_RANDR_TYPE,ACE_OS::gethrtime ()))
, sequence_number_generator_ (0)
{
// Initialize the stack...
@@ -278,8 +278,8 @@ Tester::send_ack ()
}
if (!set)
return 0;
- ACE_DEBUG ((LM_DEBUG, "Tested::ack - (%d,%d)\n",
- ack.next_expected, ack.highest_received));
+ // ACE_DEBUG ((LM_DEBUG, "Tested::ack - (%d,%d)\n",
+ // ack.next_expected, ack.highest_received));
return this->retransmission_.ack (ack);
}
@@ -310,8 +310,8 @@ Tester::resend (void)
int r = this->retransmission_.resend (max_sequence_number);
if (r == -1)
{
- ACE_DEBUG ((LM_DEBUG,
- "Error returned from Retransmission::resend\n"));
+ // ACE_DEBUG ((LM_DEBUG,
+ // "Error returned from Retransmission::resend\n"));
}
return r;
}
@@ -372,8 +372,8 @@ Test_Proxy::Test_Proxy (void)
int
Test_Proxy::data (ACE_RMCast::Data &data)
{
- ACE_DEBUG ((LM_DEBUG, " (%t) Proxy receives message %d\n",
- data.sequence_number));
+ // ACE_DEBUG ((LM_DEBUG, " (%t) Proxy receives message %d\n",
+ // data.sequence_number));
ACE_GUARD_RETURN (ACE_SYNCH_MUTEX, ace_mon, this->lock_, -1);
int c = ACE_OS::rand_r (this->seed_) % 100;
@@ -386,7 +386,7 @@ Test_Proxy::data (ACE_RMCast::Data &data)
ACE_RMCast::Ack ack;
ack.next_expected = this->next_expected ();
ack.highest_received = this->highest_received ();
- ACE_DEBUG ((LM_DEBUG, "....it is an already accepted message\n"));
+ // ACE_DEBUG ((LM_DEBUG, "....it is an already accepted message\n"));
// Ack the message....
return this->ack (ack);
}
@@ -400,7 +400,7 @@ Test_Proxy::data (ACE_RMCast::Data &data)
{
// We ignore the message completely as if it was lost in the
// network
- ACE_DEBUG ((LM_DEBUG, "....and drops it\n"));
+ // ACE_DEBUG ((LM_DEBUG, "....and drops it\n"));
return 0;
}
@@ -414,20 +414,20 @@ Test_Proxy::data (ACE_RMCast::Data &data)
{
ack.highest_received = data.sequence_number;
}
- ACE_DEBUG ((LM_DEBUG, "....and accepts it\n"));
+ // ACE_DEBUG ((LM_DEBUG, "....and accepts it\n"));
// Ack the message....
return this->ack (ack);
}
- ACE_DEBUG ((LM_DEBUG, "....the message is out of order\n"));
+ // ACE_DEBUG ((LM_DEBUG, "....the message is out of order\n"));
// This is an out of sequence number, maybe it is lost...
if (c > success_ratio)
{
- ACE_DEBUG ((LM_DEBUG, "........and is dropped\n"));
+ // ACE_DEBUG ((LM_DEBUG, "........and is dropped\n"));
return 0;
}
- ACE_DEBUG ((LM_DEBUG, "........and is accepted\n"));
+ // ACE_DEBUG ((LM_DEBUG, "........and is accepted\n"));
ACE_RMCast::Ack ack;
ack.next_expected = this->next_expected ();
if (data.sequence_number < this->highest_received ())
diff --git a/tests/RMCast/RMCast_Tests.dsw b/tests/RMCast/RMCast_Tests.dsw
index e9e6820ccc2..0564812dbeb 100644
--- a/tests/RMCast/RMCast_Tests.dsw
+++ b/tests/RMCast/RMCast_Tests.dsw
@@ -39,6 +39,18 @@ Package=<4>
###############################################################################
+Project: "RMCast_Reordering_Test"=.\RMCast_Reordering_Test.dsp - Package Owner=<4>
+
+Package=<5>
+{{{
+}}}
+
+Package=<4>
+{{{
+}}}
+
+###############################################################################
+
Project: "RMCast_Retransmission_Test"=.\RMCast_Retransmission_Test.dsp - Package Owner=<4>
Package=<5>