summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--ChangeLog116
-rw-r--r--ChangeLogs/ChangeLog-02a116
-rw-r--r--ChangeLogs/ChangeLog-03a116
-rw-r--r--ace/RMCast/README57
-rw-r--r--ace/RMCast/RMCast.dsp86
-rw-r--r--ace/RMCast/RMCast.h28
-rw-r--r--ace/RMCast/RMCast_Fragment.cpp10
-rw-r--r--ace/RMCast/RMCast_Fragment.h2
-rw-r--r--ace/RMCast/RMCast_IO_UDP.cpp418
-rw-r--r--ace/RMCast/RMCast_IO_UDP.h (renamed from protocols/ace/RMCast/RMCast_UDP_Receiver.h)64
-rw-r--r--ace/RMCast/RMCast_IO_UDP.i9
-rw-r--r--ace/RMCast/RMCast_Module.cpp56
-rw-r--r--ace/RMCast/RMCast_Module.h8
-rw-r--r--ace/RMCast/RMCast_Module_Factory.cpp13
-rw-r--r--ace/RMCast/RMCast_Module_Factory.h50
-rw-r--r--ace/RMCast/RMCast_Module_Factory.i (renamed from ace/RMCast/RMCast_Sender_Proxy_Best_Effort.i)0
-rw-r--r--ace/RMCast/RMCast_Reassembly.cpp6
-rw-r--r--ace/RMCast/RMCast_Reassembly.h2
-rw-r--r--ace/RMCast/RMCast_Sender_Proxy.cpp20
-rw-r--r--ace/RMCast/RMCast_Sender_Proxy.i7
-rw-r--r--ace/RMCast/RMCast_Sender_Proxy_Best_Effort.cpp72
-rw-r--r--ace/RMCast/RMCast_Sender_Proxy_Best_Effort.h53
-rw-r--r--ace/RMCast/RMCast_Sender_Proxy_Factory.cpp13
-rw-r--r--ace/RMCast/RMCast_Sender_Proxy_Factory.h55
-rw-r--r--ace/RMCast/RMCast_UDP_Event_Handler.cpp8
-rw-r--r--ace/RMCast/RMCast_UDP_Event_Handler.h8
-rw-r--r--ace/RMCast/RMCast_UDP_Event_Handler.i5
-rw-r--r--ace/RMCast/RMCast_UDP_Proxy.cpp136
-rw-r--r--ace/RMCast/RMCast_UDP_Proxy.h (renamed from ace/RMCast/RMCast_Sender_Proxy.h)40
-rw-r--r--ace/RMCast/RMCast_UDP_Proxy.i13
-rw-r--r--ace/RMCast/RMCast_UDP_Receiver.cpp241
-rw-r--r--ace/RMCast/RMCast_UDP_Receiver.i9
-rw-r--r--ace/RMCast/RMCast_UDP_Sender.cpp91
-rw-r--r--ace/RMCast/RMCast_UDP_Sender.h70
-rw-r--r--ace/RMCast/RMCast_UDP_Sender.i1
-rw-r--r--protocols/ace/RMCast/README57
-rw-r--r--protocols/ace/RMCast/RMCast.dsp86
-rw-r--r--protocols/ace/RMCast/RMCast.h28
-rw-r--r--protocols/ace/RMCast/RMCast_Fragment.cpp10
-rw-r--r--protocols/ace/RMCast/RMCast_Fragment.h2
-rw-r--r--protocols/ace/RMCast/RMCast_IO_UDP.cpp418
-rw-r--r--protocols/ace/RMCast/RMCast_IO_UDP.h (renamed from ace/RMCast/RMCast_UDP_Receiver.h)64
-rw-r--r--protocols/ace/RMCast/RMCast_IO_UDP.i9
-rw-r--r--protocols/ace/RMCast/RMCast_Module.cpp56
-rw-r--r--protocols/ace/RMCast/RMCast_Module.h8
-rw-r--r--protocols/ace/RMCast/RMCast_Module_Factory.cpp13
-rw-r--r--protocols/ace/RMCast/RMCast_Module_Factory.h50
-rw-r--r--protocols/ace/RMCast/RMCast_Module_Factory.i (renamed from ace/RMCast/RMCast_Sender_Proxy_Factory.i)0
-rw-r--r--protocols/ace/RMCast/RMCast_Reassembly.cpp6
-rw-r--r--protocols/ace/RMCast/RMCast_Reassembly.h2
-rw-r--r--protocols/ace/RMCast/RMCast_Sender_Proxy.cpp20
-rw-r--r--protocols/ace/RMCast/RMCast_Sender_Proxy.i7
-rw-r--r--protocols/ace/RMCast/RMCast_Sender_Proxy_Best_Effort.cpp72
-rw-r--r--protocols/ace/RMCast/RMCast_Sender_Proxy_Best_Effort.h53
-rw-r--r--protocols/ace/RMCast/RMCast_Sender_Proxy_Best_Effort.i1
-rw-r--r--protocols/ace/RMCast/RMCast_Sender_Proxy_Factory.cpp13
-rw-r--r--protocols/ace/RMCast/RMCast_Sender_Proxy_Factory.h55
-rw-r--r--protocols/ace/RMCast/RMCast_Sender_Proxy_Factory.i1
-rw-r--r--protocols/ace/RMCast/RMCast_UDP_Event_Handler.cpp8
-rw-r--r--protocols/ace/RMCast/RMCast_UDP_Event_Handler.h8
-rw-r--r--protocols/ace/RMCast/RMCast_UDP_Event_Handler.i5
-rw-r--r--protocols/ace/RMCast/RMCast_UDP_Proxy.cpp136
-rw-r--r--protocols/ace/RMCast/RMCast_UDP_Proxy.h (renamed from protocols/ace/RMCast/RMCast_Sender_Proxy.h)40
-rw-r--r--protocols/ace/RMCast/RMCast_UDP_Proxy.i13
-rw-r--r--protocols/ace/RMCast/RMCast_UDP_Receiver.cpp241
-rw-r--r--protocols/ace/RMCast/RMCast_UDP_Receiver.i9
-rw-r--r--protocols/ace/RMCast/RMCast_UDP_Sender.cpp91
-rw-r--r--protocols/ace/RMCast/RMCast_UDP_Sender.h70
-rw-r--r--protocols/ace/RMCast/RMCast_UDP_Sender.i1
-rw-r--r--tests/RMCast/RMCast_Fragment_Test.cpp10
-rw-r--r--tests/RMCast/RMCast_Reassembly_Test.cpp6
-rw-r--r--tests/RMCast/RMCast_Tests.dsw12
-rw-r--r--tests/RMCast/RMCast_UDP_Best_Effort_Test.cpp146
-rw-r--r--tests/RMCast/RMCast_UDP_Best_Effort_Test.dsp96
74 files changed, 2364 insertions, 1558 deletions
diff --git a/ChangeLog b/ChangeLog
index 4ef5833d17e..ce8fe63c5a5 100644
--- a/ChangeLog
+++ b/ChangeLog
@@ -1,3 +1,79 @@
+Mon Aug 21 08:58:19 2000 Carlos O'Ryan <coryan@uci.edu>
+
+ * Another iteration on the design. This time we use a single
+ Module to process IO events, the same class can be used on the
+ receiver and sender sides. The type of proxies is fixed, all
+ the variation is moved into the Modules, controlled by a module
+ factory.
+
+ * ace/RMCast/README:
+ Add a new README file.
+
+ * ace/RMCast/RMCast_Module.h:
+ * ace/RMCast/RMCast_Module.cpp:
+ Modules can pass and process control messages as well as data
+ messages, add new methods for the control messages.
+
+ * ace/RMCast/RMCast_IO_UDP.h:
+ * ace/RMCast/RMCast_IO_UDP.i:
+ * ace/RMCast/RMCast_IO_UDP.cpp:
+ New class to manage all the IO events.
+
+ * ace/RMCast/RMCast_UDP_Event_Handler.h:
+ * ace/RMCast/RMCast_UDP_Event_Handler.i:
+ * ace/RMCast/RMCast_UDP_Event_Handler.cpp:
+ Modified to use the new RMCast_IO_UDP class.
+
+ * ace/RMCast/RMCast_UDP_Proxy.h:
+ * ace/RMCast/RMCast_UDP_Proxy.i:
+ * ace/RMCast/RMCast_UDP_Proxy.cpp:
+ New module used for both the client and server sides.
+
+ * ace/RMCast/RMCast_Module_Factory.h:
+ * ace/RMCast/RMCast_Module_Factory.i:
+ * ace/RMCast/RMCast_Module_Factory.cpp:
+ Create and destroy a module stack. Used by both the receiver
+ and sender sides to control the type of event processing they
+ can perform.
+
+ * ace/RMCast/RMCast.h:
+ Minor changes in the field names and comments.
+
+ * ace/RMCast/RMCast_Fragment.h:
+ * ace/RMCast/RMCast_Fragment.cpp:
+ * ace/RMCast/RMCast_Reassembly.h:
+ * ace/RMCast/RMCast_Reassembly.cpp:
+ Modified to use the new methods in the Module interface.
+
+ * tests/RMCast/RMCast_Fragment_Test.cpp:
+ * tests/RMCast/RMCast_Reassembly_Test.cpp:
+ * tests/RMCast/RMCast_UDP_Best_Effort_Test.cpp:
+ Modified to use the new classes.
+
+ * tests/RMCast/RMCast_Tests.dsw:
+ * tests/RMCast/RMCast_UDP_Best_Effort_Test.dsp:
+ Add new project file for the UDP best effort test.
+
+ * ace/RMCast/RMCast.dsp:
+ Update the project file.
+
+ * ace/RMCast/RMCast_Sender_Proxy.h:
+ * ace/RMCast/RMCast_Sender_Proxy.i:
+ * ace/RMCast/RMCast_Sender_Proxy.cpp:
+ * ace/RMCast/RMCast_Sender_Proxy_Best_Effort.h:
+ * ace/RMCast/RMCast_Sender_Proxy_Best_Effort.i:
+ * ace/RMCast/RMCast_Sender_Proxy_Best_Effort.cpp:
+ * ace/RMCast/RMCast_Sender_Proxy_Factory.h:
+ * ace/RMCast/RMCast_Sender_Proxy_Factory.i:
+ * ace/RMCast/RMCast_Sender_Proxy_Factory.cpp:
+ * ace/RMCast/RMCast_UDP_Receiver.h:
+ * ace/RMCast/RMCast_UDP_Receiver.i:
+ * ace/RMCast/RMCast_UDP_Receiver.cpp:
+ * ace/RMCast/RMCast_UDP_Sender.h:
+ * ace/RMCast/RMCast_UDP_Sender.i:
+ * ace/RMCast/RMCast_UDP_Sender.cpp:
+ Removed.
+
Fri Aug 18 12:36:21 2000 Steve Huston <shuston@riverace.com>
* ace/SUN_Proactor.cpp (find_completed_aio): Wrapped ACE_GUARD_RETURN
@@ -5,18 +81,18 @@ Fri Aug 18 12:36:21 2000 Steve Huston <shuston@riverace.com>
Thu Aug 17 05:48:51 2000 Douglas C. Schmidt <schmidt@tango.cs.wustl.edu>
- * ace/config-win32-common.h: Wrapped the redefinition of FD_SETSIZE
- to #undef if it's already set to avoid a compiler/pre-processor
- warning. Thanks to Johnny Willemsen <Johnny.Willemsen@meco.nl>
- for contributing these.
+ * ace/config-win32-common.h: Wrapped the redefinition of FD_SETSIZE
+ to #undef if it's already set to avoid a compiler/pre-processor
+ warning. Thanks to Johnny Willemsen <Johnny.Willemsen@meco.nl>
+ for contributing these.
Wed Aug 16 06:03:11 2000 Douglas C. Schmidt <schmidt@tango.cs.wustl.edu>
- * ace/Select_Reactor_Base.h: Make the notify_queue_lock_ in
- ACE_Select_Reactor_Notify an ACE_SYNCH_MUTEX rather than an
- ACE_SYNCH_RW_MUTEX to be consistent. Thanks to Ivan Murphy for
- reporting this.
-
+ * ace/Select_Reactor_Base.h: Make the notify_queue_lock_ in
+ ACE_Select_Reactor_Notify an ACE_SYNCH_MUTEX rather than an
+ ACE_SYNCH_RW_MUTEX to be consistent. Thanks to Ivan Murphy for
+ reporting this.
+
Tue Aug 15 20:54:14 2000 Darrell Brunsch <brunsch@uci.edu>
* ace/Logging_Strategy.cpp:
@@ -40,20 +116,20 @@ Fri Aug 11 17:49:36 2000 Steve Huston <shuston@riverace.com>
Fri Aug 11 16:24:13 2000 Shawn Hannan <hannan@tango.cs.wustl.edu>
- * ace/OS.i: For VXWORKS, changed checks of errno against
- S_objLib_OBJ_TIMEOUT to S_objLib_OBJ_UNAVAILABLE after calls
- to semTake with timeout parameter set to NO_WAIT (i.e., 0).
- semTake will set errno to _UNAVAILABLE, not _TIMEOUT, if
- the semaphore is empty and the timeout parameter is 0.
- Thanks to Peter Fischer <fischer@softec.de> for reporting
- this.
+ * ace/OS.i: For VXWORKS, changed checks of errno against
+ S_objLib_OBJ_TIMEOUT to S_objLib_OBJ_UNAVAILABLE after calls
+ to semTake with timeout parameter set to NO_WAIT (i.e., 0).
+ semTake will set errno to _UNAVAILABLE, not _TIMEOUT, if
+ the semaphore is empty and the timeout parameter is 0.
+ Thanks to Peter Fischer <fischer@softec.de> for reporting
+ this.
Fri Aug 11 13:20:40 2000 Nanbor Wang <nanbor@cs.wustl.edu>
- * apps/gperf/src/gperf_lib.dsp: Changed the format for generated
- debug info from C7 to "Program Database" to conform to rest of
- the ACE project files. Thanks to Espen Harlinn
- <espen.harlinn@seamos.no> for reporting this.
+ * apps/gperf/src/gperf_lib.dsp: Changed the format for generated
+ debug info from C7 to "Program Database" to conform to rest of
+ the ACE project files. Thanks to Espen Harlinn
+ <espen.harlinn@seamos.no> for reporting this.
Fri Aug 11 11:06:30 2000 David L. Levine <levine@cs.wustl.edu>
diff --git a/ChangeLogs/ChangeLog-02a b/ChangeLogs/ChangeLog-02a
index 4ef5833d17e..ce8fe63c5a5 100644
--- a/ChangeLogs/ChangeLog-02a
+++ b/ChangeLogs/ChangeLog-02a
@@ -1,3 +1,79 @@
+Mon Aug 21 08:58:19 2000 Carlos O'Ryan <coryan@uci.edu>
+
+ * Another iteration on the design. This time we use a single
+ Module to process IO events, the same class can be used on the
+ receiver and sender sides. The type of proxies is fixed, all
+ the variation is moved into the Modules, controlled by a module
+ factory.
+
+ * ace/RMCast/README:
+ Add a new README file.
+
+ * ace/RMCast/RMCast_Module.h:
+ * ace/RMCast/RMCast_Module.cpp:
+ Modules can pass and process control messages as well as data
+ messages, add new methods for the control messages.
+
+ * ace/RMCast/RMCast_IO_UDP.h:
+ * ace/RMCast/RMCast_IO_UDP.i:
+ * ace/RMCast/RMCast_IO_UDP.cpp:
+ New class to manage all the IO events.
+
+ * ace/RMCast/RMCast_UDP_Event_Handler.h:
+ * ace/RMCast/RMCast_UDP_Event_Handler.i:
+ * ace/RMCast/RMCast_UDP_Event_Handler.cpp:
+ Modified to use the new RMCast_IO_UDP class.
+
+ * ace/RMCast/RMCast_UDP_Proxy.h:
+ * ace/RMCast/RMCast_UDP_Proxy.i:
+ * ace/RMCast/RMCast_UDP_Proxy.cpp:
+ New module used for both the client and server sides.
+
+ * ace/RMCast/RMCast_Module_Factory.h:
+ * ace/RMCast/RMCast_Module_Factory.i:
+ * ace/RMCast/RMCast_Module_Factory.cpp:
+ Create and destroy a module stack. Used by both the receiver
+ and sender sides to control the type of event processing they
+ can perform.
+
+ * ace/RMCast/RMCast.h:
+ Minor changes in the field names and comments.
+
+ * ace/RMCast/RMCast_Fragment.h:
+ * ace/RMCast/RMCast_Fragment.cpp:
+ * ace/RMCast/RMCast_Reassembly.h:
+ * ace/RMCast/RMCast_Reassembly.cpp:
+ Modified to use the new methods in the Module interface.
+
+ * tests/RMCast/RMCast_Fragment_Test.cpp:
+ * tests/RMCast/RMCast_Reassembly_Test.cpp:
+ * tests/RMCast/RMCast_UDP_Best_Effort_Test.cpp:
+ Modified to use the new classes.
+
+ * tests/RMCast/RMCast_Tests.dsw:
+ * tests/RMCast/RMCast_UDP_Best_Effort_Test.dsp:
+ Add new project file for the UDP best effort test.
+
+ * ace/RMCast/RMCast.dsp:
+ Update the project file.
+
+ * ace/RMCast/RMCast_Sender_Proxy.h:
+ * ace/RMCast/RMCast_Sender_Proxy.i:
+ * ace/RMCast/RMCast_Sender_Proxy.cpp:
+ * ace/RMCast/RMCast_Sender_Proxy_Best_Effort.h:
+ * ace/RMCast/RMCast_Sender_Proxy_Best_Effort.i:
+ * ace/RMCast/RMCast_Sender_Proxy_Best_Effort.cpp:
+ * ace/RMCast/RMCast_Sender_Proxy_Factory.h:
+ * ace/RMCast/RMCast_Sender_Proxy_Factory.i:
+ * ace/RMCast/RMCast_Sender_Proxy_Factory.cpp:
+ * ace/RMCast/RMCast_UDP_Receiver.h:
+ * ace/RMCast/RMCast_UDP_Receiver.i:
+ * ace/RMCast/RMCast_UDP_Receiver.cpp:
+ * ace/RMCast/RMCast_UDP_Sender.h:
+ * ace/RMCast/RMCast_UDP_Sender.i:
+ * ace/RMCast/RMCast_UDP_Sender.cpp:
+ Removed.
+
Fri Aug 18 12:36:21 2000 Steve Huston <shuston@riverace.com>
* ace/SUN_Proactor.cpp (find_completed_aio): Wrapped ACE_GUARD_RETURN
@@ -5,18 +81,18 @@ Fri Aug 18 12:36:21 2000 Steve Huston <shuston@riverace.com>
Thu Aug 17 05:48:51 2000 Douglas C. Schmidt <schmidt@tango.cs.wustl.edu>
- * ace/config-win32-common.h: Wrapped the redefinition of FD_SETSIZE
- to #undef if it's already set to avoid a compiler/pre-processor
- warning. Thanks to Johnny Willemsen <Johnny.Willemsen@meco.nl>
- for contributing these.
+ * ace/config-win32-common.h: Wrapped the redefinition of FD_SETSIZE
+ to #undef if it's already set to avoid a compiler/pre-processor
+ warning. Thanks to Johnny Willemsen <Johnny.Willemsen@meco.nl>
+ for contributing these.
Wed Aug 16 06:03:11 2000 Douglas C. Schmidt <schmidt@tango.cs.wustl.edu>
- * ace/Select_Reactor_Base.h: Make the notify_queue_lock_ in
- ACE_Select_Reactor_Notify an ACE_SYNCH_MUTEX rather than an
- ACE_SYNCH_RW_MUTEX to be consistent. Thanks to Ivan Murphy for
- reporting this.
-
+ * ace/Select_Reactor_Base.h: Make the notify_queue_lock_ in
+ ACE_Select_Reactor_Notify an ACE_SYNCH_MUTEX rather than an
+ ACE_SYNCH_RW_MUTEX to be consistent. Thanks to Ivan Murphy for
+ reporting this.
+
Tue Aug 15 20:54:14 2000 Darrell Brunsch <brunsch@uci.edu>
* ace/Logging_Strategy.cpp:
@@ -40,20 +116,20 @@ Fri Aug 11 17:49:36 2000 Steve Huston <shuston@riverace.com>
Fri Aug 11 16:24:13 2000 Shawn Hannan <hannan@tango.cs.wustl.edu>
- * ace/OS.i: For VXWORKS, changed checks of errno against
- S_objLib_OBJ_TIMEOUT to S_objLib_OBJ_UNAVAILABLE after calls
- to semTake with timeout parameter set to NO_WAIT (i.e., 0).
- semTake will set errno to _UNAVAILABLE, not _TIMEOUT, if
- the semaphore is empty and the timeout parameter is 0.
- Thanks to Peter Fischer <fischer@softec.de> for reporting
- this.
+ * ace/OS.i: For VXWORKS, changed checks of errno against
+ S_objLib_OBJ_TIMEOUT to S_objLib_OBJ_UNAVAILABLE after calls
+ to semTake with timeout parameter set to NO_WAIT (i.e., 0).
+ semTake will set errno to _UNAVAILABLE, not _TIMEOUT, if
+ the semaphore is empty and the timeout parameter is 0.
+ Thanks to Peter Fischer <fischer@softec.de> for reporting
+ this.
Fri Aug 11 13:20:40 2000 Nanbor Wang <nanbor@cs.wustl.edu>
- * apps/gperf/src/gperf_lib.dsp: Changed the format for generated
- debug info from C7 to "Program Database" to conform to rest of
- the ACE project files. Thanks to Espen Harlinn
- <espen.harlinn@seamos.no> for reporting this.
+ * apps/gperf/src/gperf_lib.dsp: Changed the format for generated
+ debug info from C7 to "Program Database" to conform to rest of
+ the ACE project files. Thanks to Espen Harlinn
+ <espen.harlinn@seamos.no> for reporting this.
Fri Aug 11 11:06:30 2000 David L. Levine <levine@cs.wustl.edu>
diff --git a/ChangeLogs/ChangeLog-03a b/ChangeLogs/ChangeLog-03a
index 4ef5833d17e..ce8fe63c5a5 100644
--- a/ChangeLogs/ChangeLog-03a
+++ b/ChangeLogs/ChangeLog-03a
@@ -1,3 +1,79 @@
+Mon Aug 21 08:58:19 2000 Carlos O'Ryan <coryan@uci.edu>
+
+ * Another iteration on the design. This time we use a single
+ Module to process IO events, the same class can be used on the
+ receiver and sender sides. The type of proxies is fixed, all
+ the variation is moved into the Modules, controlled by a module
+ factory.
+
+ * ace/RMCast/README:
+ Add a new README file.
+
+ * ace/RMCast/RMCast_Module.h:
+ * ace/RMCast/RMCast_Module.cpp:
+ Modules can pass and process control messages as well as data
+ messages, add new methods for the control messages.
+
+ * ace/RMCast/RMCast_IO_UDP.h:
+ * ace/RMCast/RMCast_IO_UDP.i:
+ * ace/RMCast/RMCast_IO_UDP.cpp:
+ New class to manage all the IO events.
+
+ * ace/RMCast/RMCast_UDP_Event_Handler.h:
+ * ace/RMCast/RMCast_UDP_Event_Handler.i:
+ * ace/RMCast/RMCast_UDP_Event_Handler.cpp:
+ Modified to use the new RMCast_IO_UDP class.
+
+ * ace/RMCast/RMCast_UDP_Proxy.h:
+ * ace/RMCast/RMCast_UDP_Proxy.i:
+ * ace/RMCast/RMCast_UDP_Proxy.cpp:
+ New module used for both the client and server sides.
+
+ * ace/RMCast/RMCast_Module_Factory.h:
+ * ace/RMCast/RMCast_Module_Factory.i:
+ * ace/RMCast/RMCast_Module_Factory.cpp:
+ Create and destroy a module stack. Used by both the receiver
+ and sender sides to control the type of event processing they
+ can perform.
+
+ * ace/RMCast/RMCast.h:
+ Minor changes in the field names and comments.
+
+ * ace/RMCast/RMCast_Fragment.h:
+ * ace/RMCast/RMCast_Fragment.cpp:
+ * ace/RMCast/RMCast_Reassembly.h:
+ * ace/RMCast/RMCast_Reassembly.cpp:
+ Modified to use the new methods in the Module interface.
+
+ * tests/RMCast/RMCast_Fragment_Test.cpp:
+ * tests/RMCast/RMCast_Reassembly_Test.cpp:
+ * tests/RMCast/RMCast_UDP_Best_Effort_Test.cpp:
+ Modified to use the new classes.
+
+ * tests/RMCast/RMCast_Tests.dsw:
+ * tests/RMCast/RMCast_UDP_Best_Effort_Test.dsp:
+ Add new project file for the UDP best effort test.
+
+ * ace/RMCast/RMCast.dsp:
+ Update the project file.
+
+ * ace/RMCast/RMCast_Sender_Proxy.h:
+ * ace/RMCast/RMCast_Sender_Proxy.i:
+ * ace/RMCast/RMCast_Sender_Proxy.cpp:
+ * ace/RMCast/RMCast_Sender_Proxy_Best_Effort.h:
+ * ace/RMCast/RMCast_Sender_Proxy_Best_Effort.i:
+ * ace/RMCast/RMCast_Sender_Proxy_Best_Effort.cpp:
+ * ace/RMCast/RMCast_Sender_Proxy_Factory.h:
+ * ace/RMCast/RMCast_Sender_Proxy_Factory.i:
+ * ace/RMCast/RMCast_Sender_Proxy_Factory.cpp:
+ * ace/RMCast/RMCast_UDP_Receiver.h:
+ * ace/RMCast/RMCast_UDP_Receiver.i:
+ * ace/RMCast/RMCast_UDP_Receiver.cpp:
+ * ace/RMCast/RMCast_UDP_Sender.h:
+ * ace/RMCast/RMCast_UDP_Sender.i:
+ * ace/RMCast/RMCast_UDP_Sender.cpp:
+ Removed.
+
Fri Aug 18 12:36:21 2000 Steve Huston <shuston@riverace.com>
* ace/SUN_Proactor.cpp (find_completed_aio): Wrapped ACE_GUARD_RETURN
@@ -5,18 +81,18 @@ Fri Aug 18 12:36:21 2000 Steve Huston <shuston@riverace.com>
Thu Aug 17 05:48:51 2000 Douglas C. Schmidt <schmidt@tango.cs.wustl.edu>
- * ace/config-win32-common.h: Wrapped the redefinition of FD_SETSIZE
- to #undef if it's already set to avoid a compiler/pre-processor
- warning. Thanks to Johnny Willemsen <Johnny.Willemsen@meco.nl>
- for contributing these.
+ * ace/config-win32-common.h: Wrapped the redefinition of FD_SETSIZE
+ to #undef if it's already set to avoid a compiler/pre-processor
+ warning. Thanks to Johnny Willemsen <Johnny.Willemsen@meco.nl>
+ for contributing these.
Wed Aug 16 06:03:11 2000 Douglas C. Schmidt <schmidt@tango.cs.wustl.edu>
- * ace/Select_Reactor_Base.h: Make the notify_queue_lock_ in
- ACE_Select_Reactor_Notify an ACE_SYNCH_MUTEX rather than an
- ACE_SYNCH_RW_MUTEX to be consistent. Thanks to Ivan Murphy for
- reporting this.
-
+ * ace/Select_Reactor_Base.h: Make the notify_queue_lock_ in
+ ACE_Select_Reactor_Notify an ACE_SYNCH_MUTEX rather than an
+ ACE_SYNCH_RW_MUTEX to be consistent. Thanks to Ivan Murphy for
+ reporting this.
+
Tue Aug 15 20:54:14 2000 Darrell Brunsch <brunsch@uci.edu>
* ace/Logging_Strategy.cpp:
@@ -40,20 +116,20 @@ Fri Aug 11 17:49:36 2000 Steve Huston <shuston@riverace.com>
Fri Aug 11 16:24:13 2000 Shawn Hannan <hannan@tango.cs.wustl.edu>
- * ace/OS.i: For VXWORKS, changed checks of errno against
- S_objLib_OBJ_TIMEOUT to S_objLib_OBJ_UNAVAILABLE after calls
- to semTake with timeout parameter set to NO_WAIT (i.e., 0).
- semTake will set errno to _UNAVAILABLE, not _TIMEOUT, if
- the semaphore is empty and the timeout parameter is 0.
- Thanks to Peter Fischer <fischer@softec.de> for reporting
- this.
+ * ace/OS.i: For VXWORKS, changed checks of errno against
+ S_objLib_OBJ_TIMEOUT to S_objLib_OBJ_UNAVAILABLE after calls
+ to semTake with timeout parameter set to NO_WAIT (i.e., 0).
+ semTake will set errno to _UNAVAILABLE, not _TIMEOUT, if
+ the semaphore is empty and the timeout parameter is 0.
+ Thanks to Peter Fischer <fischer@softec.de> for reporting
+ this.
Fri Aug 11 13:20:40 2000 Nanbor Wang <nanbor@cs.wustl.edu>
- * apps/gperf/src/gperf_lib.dsp: Changed the format for generated
- debug info from C7 to "Program Database" to conform to rest of
- the ACE project files. Thanks to Espen Harlinn
- <espen.harlinn@seamos.no> for reporting this.
+ * apps/gperf/src/gperf_lib.dsp: Changed the format for generated
+ debug info from C7 to "Program Database" to conform to rest of
+ the ACE project files. Thanks to Espen Harlinn
+ <espen.harlinn@seamos.no> for reporting this.
Fri Aug 11 11:06:30 2000 David L. Levine <levine@cs.wustl.edu>
diff --git a/ace/RMCast/README b/ace/RMCast/README
new file mode 100644
index 00000000000..2dd0c5d9cfc
--- /dev/null
+++ b/ace/RMCast/README
@@ -0,0 +1,57 @@
+# $Id$
+
+ This directory will contain a simple, small-scale reliable
+multicast framework for ACE. The framework is based on the ASX
+components of the ACE library: the protocol is implemented as a stack
+of interchangeable "modules", each one in charge of a very small task.
+For example, one module implements fragmentation and reassembly, other
+modules implement retransmission, send ACK and NAK messages, and
+maintain receiver membership.
+
+ The modules are replaced to achieve different levels of
+reliability. For example, the retransmission module can be either the
+"Best_Effort", "Semi_Reliable" or "Reliable" implementation. In the
+first case no retransmissions are performed, but lost messages are
+detected and reported to the receiver. The "Semi_Reliable" case
+messages are held for a pre-specified amount of time, and
+re-transmited if requested, but it is possible to loose some messages
+if multiple re-transmissions fail. As in the "Best_Effort" case the
+lost messages are detected and flagged to the application. Finally
+in the "Reliable" mode the senders are flowed controlled until enough
+messages are successfully transmitted.
+
+ In general the stack looks like this:
+
+
+SENDER:
+
+----------------------------------------------------------------
+Buffering : Save lost messages
+Retransmission : Retransmit
+----------------------------------------------------------------
+Fragmentation : Fragment messages in smaller chunks
+Reassembly : and ensure that the IOVMAX limit is not
+ : reached
+----------------------------------------------------------------
+Tranport : Encapsulate the specific transport media
+ : such as TCP/IP, ATM, or shared memory
+ : Demuxes incoming data to the right chain
+ : Change control messages and data messages
+ : to the right dynamic types.
+----------------------------------------------------------------
+
+RECEIVER:
+
+----------------------------------------------------------------
+Lost detection : Detect lost messages and send control
+ : messages back
+----------------------------------------------------------------
+Reassembly : Reassemble messages, fragment control
+Fragmentation : data
+----------------------------------------------------------------
+Transport : Group membership, ACT reception,
+ : handle keep-alive messages...
+----------------------------------------------------------------
+
+
+@@ TODO: Piggybacking...
diff --git a/ace/RMCast/RMCast.dsp b/ace/RMCast/RMCast.dsp
index 53fdbf5cb9c..e3a84679364 100644
--- a/ace/RMCast/RMCast.dsp
+++ b/ace/RMCast/RMCast.dsp
@@ -94,14 +94,50 @@ LINK32=link.exe
# PROP Default_Filter "cpp;c;cxx;rc;def;r;odl;idl;hpj;bat"
# Begin Source File
+SOURCE=.\RMCast.cpp
+# End Source File
+# Begin Source File
+
+SOURCE=.\RMCast_Fragment.cpp
+# End Source File
+# Begin Source File
+
+SOURCE=.\RMCast_IO_UDP.cpp
+# End Source File
+# Begin Source File
+
+SOURCE=.\RMCast_Module.cpp
+# End Source File
+# Begin Source File
+
+SOURCE=.\RMCast_Module_Factory.cpp
+# End Source File
+# Begin Source File
+
SOURCE=.\RMCast_Partial_Message.cpp
# End Source File
+# Begin Source File
+
+SOURCE=.\RMCast_Reassembly.cpp
+# End Source File
+# Begin Source File
+
+SOURCE=.\RMCast_UDP_Event_Handler.cpp
+# End Source File
+# Begin Source File
+
+SOURCE=.\RMCast_UDP_Proxy.cpp
+# End Source File
# End Group
# Begin Group "Header Files"
# PROP Default_Filter "h;hpp;hxx;hm;inl"
# Begin Source File
+SOURCE=.\RMCast.h
+# End Source File
+# Begin Source File
+
SOURCE=.\RMCast_Export.h
# End Source File
# Begin Source File
@@ -110,42 +146,76 @@ SOURCE=.\RMCast_Fragment.h
# End Source File
# Begin Source File
+SOURCE=.\RMCast_IO_UDP.h
+# End Source File
+# Begin Source File
+
+SOURCE=.\RMCast_Module.h
+# End Source File
+# Begin Source File
+
+SOURCE=.\RMCast_Module_Factory.h
+# End Source File
+# Begin Source File
+
SOURCE=.\RMCast_Partial_Message.h
# End Source File
# Begin Source File
SOURCE=.\RMCast_Reassembly.h
# End Source File
+# Begin Source File
+
+SOURCE=.\RMCast_UDP_Event_Handler.h
+# End Source File
+# Begin Source File
+
+SOURCE=.\RMCast_UDP_Proxy.h
+# End Source File
# End Group
# Begin Group "Inline Files"
# PROP Default_Filter "i"
# Begin Source File
+SOURCE=.\RMCast.i
+# End Source File
+# Begin Source File
+
SOURCE=.\RMCast_Fragment.i
# End Source File
# Begin Source File
+SOURCE=.\RMCast_IO_UDP.i
+# End Source File
+# Begin Source File
+
+SOURCE=.\RMCast_Module.i
+# End Source File
+# Begin Source File
+
+SOURCE=.\RMCast_Module_Factory.i
+# End Source File
+# Begin Source File
+
SOURCE=.\RMCast_Partial_Message.i
# End Source File
# Begin Source File
SOURCE=.\RMCast_Reassembly.i
# End Source File
-# End Group
-# Begin Group "Template Files"
-
-# PROP Default_Filter ""
# Begin Source File
-SOURCE=.\RMCast_Fragment.cpp
-# PROP Exclude_From_Build 1
+SOURCE=.\RMCast_UDP_Event_Handler.i
# End Source File
# Begin Source File
-SOURCE=.\RMCast_Reassembly.cpp
-# PROP Exclude_From_Build 1
+SOURCE=.\RMCast_UDP_Proxy.i
# End Source File
# End Group
+# Begin Group "Template Files"
+
+# PROP Default_Filter ""
+# End Group
# End Target
# End Project
diff --git a/ace/RMCast/RMCast.h b/ace/RMCast/RMCast.h
index 025f82a1bfb..654f391204b 100644
--- a/ace/RMCast/RMCast.h
+++ b/ace/RMCast/RMCast.h
@@ -62,8 +62,8 @@ public:
// +---------+----------------------+
// | 32 bits | fragment_offset |
// +---------+----------------------+
- // | 32 bits | payload_size |
- // +---------+----------------------+
+ // ? ? ? ? ? | 32 bits | payload_size |
+ // ? ? ? ? ? +---------+----------------------+
// | | payload |
// +---------+----------------------+
//
@@ -84,9 +84,9 @@ public:
// +---------+----------------------+
// | 8 bits | MT_ACK |
// +---------+----------------------+
- // | 32 bits | last_successful |
+ // | 32 bits | highest_in_sequence |
// +---------+----------------------+
- // | 32 bits | last_received |
+ // | 32 bits | highest_received |
// +---------+----------------------+
//
@@ -196,26 +196,30 @@ public:
ACE_Message_Block *payload;
};
- struct Ack
+ struct Poll
{
- ACE_UINT32 expected;
- ACE_UINT32 last_received;
};
- struct Join
+ struct Ack_Join
{
+ ACE_INT32 next_sequence_number;
};
- struct Leave
+ struct Ack_Leave
{
};
- struct Ack_Join
+ struct Ack
{
- ACE_INT32 next_sequence_number;
+ ACE_UINT32 highest_in_sequence;
+ ACE_UINT32 highest_received;
};
- struct Ack_Leave
+ struct Join
+ {
+ };
+
+ struct Leave
{
};
};
diff --git a/ace/RMCast/RMCast_Fragment.cpp b/ace/RMCast/RMCast_Fragment.cpp
index b3baee4f972..976def7a241 100644
--- a/ace/RMCast/RMCast_Fragment.cpp
+++ b/ace/RMCast/RMCast_Fragment.cpp
@@ -25,7 +25,7 @@ ACE_RMCast_Fragment::~ACE_RMCast_Fragment (void)
}
int
-ACE_RMCast_Fragment::put_data (ACE_RMCast::Data &received_data)
+ACE_RMCast_Fragment::data (ACE_RMCast::Data &received_data)
{
if (this->next () == 0)
return 0;
@@ -38,7 +38,7 @@ ACE_RMCast_Fragment::put_data (ACE_RMCast::Data &received_data)
// @@ We should keep the total size precomputed
data.total_size = mb->total_size ();
- // We must leave room for the header
+ // We must leave room for the header
#if defined (ACE_HAS_BROKEN_DGRAM_SENDV)
const int ACE_RMCAST_WRITEV_MAX = IOV_MAX - 2;
#else
@@ -126,7 +126,7 @@ ACE_RMCast_Fragment::put_data (ACE_RMCast::Data &received_data)
+ last_sent_mb_len);
data.payload = blocks;
- if (this->next ()->put_data (data) == -1)
+ if (this->next ()->data (data) == -1)
return -1;
// adjust the offset
@@ -172,7 +172,7 @@ ACE_RMCast_Fragment::put_data (ACE_RMCast::Data &received_data)
// have:
if (iovcnt == ACE_RMCAST_WRITEV_MAX)
{
- if (this->next ()->put_data (data) == -1)
+ if (this->next ()->data (data) == -1)
return -1;
iovcnt = 0;
@@ -184,5 +184,5 @@ ACE_RMCast_Fragment::put_data (ACE_RMCast::Data &received_data)
if (iovcnt == 0)
return 0;
- return this->next ()->put_data (data);
+ return this->next ()->data (data);
}
diff --git a/ace/RMCast/RMCast_Fragment.h b/ace/RMCast/RMCast_Fragment.h
index e42440b6c12..7b64d763ebc 100644
--- a/ace/RMCast/RMCast_Fragment.h
+++ b/ace/RMCast/RMCast_Fragment.h
@@ -40,7 +40,7 @@ public:
// feedback from the lower layer (transport?)
// = The ACE_RMCast_Module methods
- virtual int put_data (ACE_RMCast::Data &data);
+ virtual int data (ACE_RMCast::Data &data);
private:
size_t max_fragment_size_;
diff --git a/ace/RMCast/RMCast_IO_UDP.cpp b/ace/RMCast/RMCast_IO_UDP.cpp
new file mode 100644
index 00000000000..89cc7ae3c3a
--- /dev/null
+++ b/ace/RMCast/RMCast_IO_UDP.cpp
@@ -0,0 +1,418 @@
+//
+// $Id$
+//
+
+#include "RMCast_IO_UDP.h"
+#include "RMCast_UDP_Proxy.h"
+#include "RMCast_Module_Factory.h"
+#include "ace/Handle_Set.h"
+#include "ace/Reactor.h"
+#include "ace/Message_Block.h"
+
+#if !defined (__ACE_INLINE__)
+# include "RMCast_IO_UDP.i"
+#endif /* ! __ACE_INLINE__ */
+
+ACE_RCSID(ace, RMCast_IO_UDP, "$Id$")
+
+ACE_RMCast_IO_UDP::~ACE_RMCast_IO_UDP (void)
+{
+}
+
+int
+ACE_RMCast_IO_UDP::subscribe (const ACE_INET_Addr &mcast_addr,
+ int reuse_addr,
+ const ACE_TCHAR *net_if,
+ int protocol_family,
+ int protocol)
+{
+ this->mcast_group_ = mcast_addr;
+ return this->dgram_.subscribe (mcast_addr,
+ reuse_addr,
+ net_if,
+ protocol_family,
+ protocol);
+}
+
+int
+ACE_RMCast_IO_UDP::handle_events (ACE_Time_Value *tv)
+{
+ ACE_HANDLE h = this->dgram_.get_handle ();
+ if (h == ACE_INVALID_HANDLE)
+ return -1;
+
+ ACE_Handle_Set handle_set;
+ handle_set.set_bit (h);
+
+ ACE_Countdown_Time countdown (tv);
+
+ int r = ACE_OS::select (int(h) + 1,
+ handle_set, 0, 0,
+ tv);
+ if (r == -1)
+ {
+ if (errno == EINTR)
+ return 0;
+ else
+ return -1;
+ }
+ else if (r == 0)
+ {
+ return 0;
+ }
+
+ return this->handle_input (h);
+}
+
+int
+ACE_RMCast_IO_UDP::register_handlers (ACE_Reactor *reactor)
+{
+ this->eh_.reactor (reactor);
+ return reactor->register_handler (&this->eh_,
+ ACE_Event_Handler::READ_MASK);
+}
+
+int
+ACE_RMCast_IO_UDP::remove_handlers (void)
+{
+ ACE_Reactor *r = this->eh_.reactor ();
+ if (r != 0)
+ {
+ r->remove_handler (&this->eh_,
+ ACE_Event_Handler::ALL_EVENTS_MASK
+ | ACE_Event_Handler::DONT_CALL);
+ this->eh_.reactor (0);
+ }
+ return 0;
+}
+
+int
+ACE_RMCast_IO_UDP::handle_input (ACE_HANDLE)
+{
+ // @@ We should use a system constant instead of this literal
+ const int max_udp_packet_size = 65536;
+ char buffer[max_udp_packet_size];
+
+ ACE_INET_Addr from_address;
+ ssize_t r =
+ this->dgram_.recv (buffer, sizeof(buffer), from_address);
+
+ if (r == -1)
+ {
+ // @@ LOG??
+ ACE_DEBUG ((LM_DEBUG,
+ "RMCast_IO_UDP::handle_input () - "
+ "error in recv\n"));
+ return -1;
+ }
+
+ // ACE_HEX_DUMP ((LM_DEBUG, buffer, 16, "Receiver::handle_input"));
+
+ // @@ Locking!
+
+ int type = buffer[0];
+
+ if (type < 0 || type >= ACE_RMCast::MT_LAST)
+ {
+ // @@ Log: invalid message type!!
+ // @@ TODO: should we return -1? The socket is still valid, it
+ // makes little sense to destroy it just because one remote
+ // sender is sending invalid messages. Maybe we should
+ // strategize this too, and report the problem to the
+ // application, this could indicate a misconfiguration or
+ // something worse...
+
+ // In any case the proxy should be destroyed, its peer is making
+ // something really wrong.
+ ACE_RMCast_UDP_Proxy *proxy;
+ if (this->map_.unbind (from_address, proxy) == 0)
+ {
+ this->factory_->destroy (proxy->module ());
+ delete proxy;
+ }
+ return 0;
+ }
+
+ ACE_RMCast_UDP_Proxy *proxy;
+ if (this->map_.find (from_address, proxy) != 0)
+ {
+ // State == RS_NON_EXISTENT
+
+ // @@ We should validate the message *before* creating the
+ // object, all we need is some sort of validation strategy, a
+ // different one for the receiver and another one for the
+ // sender.
+#if 0
+ if (type == ACE_RMCast::MT_ACK
+ || type == ACE_RMCast::MT_JOIN
+ || type == ACE_RMCast::MT_LEAVE
+ || type == ACE_RMCast::MT_ACK_LEAVE)
+ {
+ // All these message types indicate a problem, the should be
+ // generated by receivers, not received by them.
+ return 0;
+ }
+#endif /* 0 */
+
+ // The message type is valid, we must create a new proxy,
+ // initially in the JOINING state...
+ ACE_RMCast_Module *module = this->factory_->create (this);
+ if (module == 0)
+ {
+ // @@ LOG??
+ // Try to continue working, maybe the module can be created
+ // later.
+ return 0;
+ }
+ ACE_NEW_RETURN (proxy,
+ ACE_RMCast_UDP_Proxy(this,
+ from_address,
+ module),
+ 0);
+
+ if (this->map_.bind (from_address, proxy) != 0)
+ {
+ // @@ LOG??
+ return 0;
+ }
+
+ }
+
+ // Have the proxy process the message and do the right thing.
+ return proxy->receive_message (buffer, r);
+}
+
+ACE_HANDLE
+ACE_RMCast_IO_UDP::get_handle (void) const
+{
+ return this->dgram_.get_handle ();
+}
+
+int
+ACE_RMCast_IO_UDP::data (ACE_RMCast::Data &data)
+{
+ return this->send_data (data, this->mcast_group_);
+}
+
+int
+ACE_RMCast_IO_UDP::poll (ACE_RMCast::Poll &poll)
+{
+ return this->send_poll (poll, this->mcast_group_);
+}
+
+int
+ACE_RMCast_IO_UDP::ack_join (ACE_RMCast::Ack_Join &ack_join)
+{
+ return this->send_ack_join (ack_join, this->mcast_group_);
+}
+
+int
+ACE_RMCast_IO_UDP::ack_leave (ACE_RMCast::Ack_Leave &ack_leave)
+{
+ return this->send_ack_leave (ack_leave, this->mcast_group_);
+}
+
+int
+ACE_RMCast_IO_UDP::ack (ACE_RMCast::Ack &ack)
+{
+ return this->send_ack (ack, this->mcast_group_);
+}
+
+int
+ACE_RMCast_IO_UDP::join (ACE_RMCast::Join &join)
+{
+ return this->send_join (join, this->mcast_group_);
+}
+
+int
+ACE_RMCast_IO_UDP::leave (ACE_RMCast::Leave &leave)
+{
+ return this->send_leave (leave, this->mcast_group_);
+}
+
+int
+ACE_RMCast_IO_UDP::send_data (ACE_RMCast::Data &data,
+ const ACE_INET_Addr &to)
+{
+ // The first message block contains the header
+ // @@ TODO: We could keep the header pre-initialized, and only
+ // update the portions that do change...
+ ACE_UINT32 tmp;
+ char header[1 + 3 * sizeof(ACE_UINT32)];
+ header[0] = ACE_RMCast::MT_DATA;
+
+ tmp = ACE_HTONL (data.sequence_number);
+ ACE_OS::memcpy (header + 1,
+ &tmp, sizeof(ACE_UINT32));
+ tmp = ACE_HTONL (data.total_size);
+ ACE_OS::memcpy (header + 1 + sizeof(ACE_UINT32),
+ &tmp, sizeof(ACE_UINT32));
+ tmp = ACE_HTONL (data.fragment_offset);
+ ACE_OS::memcpy (header + 1 + 2 * sizeof(ACE_UINT32),
+ &tmp, sizeof(ACE_UINT32));
+
+ iovec iov[IOV_MAX];
+ int iovcnt = 1;
+
+ iov[0].iov_base = header;
+ iov[0].iov_len = sizeof(header);
+
+ ACE_Message_Block *mb = data.payload;
+
+ for (const ACE_Message_Block *i = mb; i != 0; i = i->cont ())
+ {
+ iov[iovcnt].iov_base = i->rd_ptr ();
+ iov[iovcnt].iov_len = i->length ();
+ iovcnt++;
+ if (iovcnt >= IOV_MAX)
+ return -1;
+ }
+
+ // @@ This pacing stuff here reduced the number of packet lost in
+ // loopback tests, but it should be taken out for real applications
+ // (or at least made configurable!)
+ ACE_Time_Value tv (0, 10000);
+ ACE_OS::sleep (tv);
+
+ // ACE_SOCK_MCast_Dgram disallows sending, but it actually works.
+ ACE_SOCK_Dgram &dgram = this->dgram_;
+
+ if (dgram.send (iov, iovcnt, to) == -1)
+ return -1;
+
+#if 0
+ ACE_HEX_DUMP ((LM_DEBUG,
+ (char*)iov[0].iov_base, iov[0].iov_len, "Sending"));
+#endif
+
+ return 0;
+}
+
+int
+ACE_RMCast_IO_UDP::send_poll (ACE_RMCast::Poll &poll,
+ const ACE_INET_Addr &to)
+{
+ // @@ TODO: We could keep the header pre-initialized, and only
+ // update the portions that do change...
+ char header[16];
+ header[0] = ACE_RMCast::MT_POLL;
+
+ // ACE_SOCK_MCast_Dgram disallows sending, but it actually works.
+ ACE_SOCK_Dgram &dgram = this->dgram_;
+
+ if (dgram.send (header, 1, to) == -1)
+ return -1;
+
+ return 0;
+}
+
+int
+ACE_RMCast_IO_UDP::send_ack_join (ACE_RMCast::Ack_Join &ack_join,
+ const ACE_INET_Addr &to)
+{
+ // @@ TODO: We could keep the header pre-initialized, and only
+ // update the portions that do change...
+ char header[16];
+ header[0] = ACE_RMCast::MT_ACK_JOIN;
+
+ ACE_UINT32 tmp = ACE_HTONL (ack_join.next_sequence_number);
+ ACE_OS::memcpy (header + 1,
+ &tmp, sizeof(ACE_UINT32));
+ // ACE_SOCK_MCast_Dgram disallows sending, but it actually works.
+ ACE_SOCK_Dgram &dgram = this->dgram_;
+
+ if (dgram.send (header, 1 + sizeof(ACE_UINT32), to) == -1)
+ return -1;
+
+ return 0;
+}
+
+int
+ACE_RMCast_IO_UDP::send_ack_leave (ACE_RMCast::Ack_Leave &ack_leave,
+ const ACE_INET_Addr &to)
+{
+ // @@ TODO: We could keep the header pre-initialized, and only
+ // update the portions that do change...
+ char header[16];
+ header[0] = ACE_RMCast::MT_ACK_LEAVE;
+
+ // ACE_SOCK_MCast_Dgram disallows sending, but it actually works.
+ ACE_SOCK_Dgram &dgram = this->dgram_;
+
+ if (dgram.send (header, 1, to) == -1)
+ return -1;
+
+ return 0;
+}
+
+int
+ACE_RMCast_IO_UDP::send_ack (ACE_RMCast::Ack &ack,
+ const ACE_INET_Addr &to)
+{
+ // @@ TODO: We could keep the header pre-initialized, and only
+ // update the portions that do change...
+ char header[16];
+ header[0] = ACE_RMCast::MT_ACK;
+
+ ACE_UINT32 tmp = ACE_HTONL (ack.highest_in_sequence);
+ ACE_OS::memcpy (header + 1,
+ &tmp, sizeof(ACE_UINT32));
+ tmp = ACE_HTONL (ack.highest_received);
+ ACE_OS::memcpy (header + 1 + sizeof(ACE_UINT32),
+ &tmp, sizeof(ACE_UINT32));
+
+ // ACE_SOCK_MCast_Dgram disallows sending, but it actually works.
+ ACE_SOCK_Dgram &dgram = this->dgram_;
+
+ if (dgram.send (header, 1 + 2*sizeof(ACE_UINT32), to) == -1)
+ return -1;
+
+ return 0;
+}
+
+int
+ACE_RMCast_IO_UDP::send_join (ACE_RMCast::Join &join,
+ const ACE_INET_Addr &to)
+{
+ // @@ TODO: We could keep the header pre-initialized, and only
+ // update the portions that do change...
+ char header[16];
+ header[0] = ACE_RMCast::MT_JOIN;
+
+ // ACE_SOCK_MCast_Dgram disallows sending, but it actually works.
+ ACE_SOCK_Dgram &dgram = this->dgram_;
+
+ if (dgram.send (header, 1, to) == -1)
+ return -1;
+
+ return 0;
+}
+
+int
+ACE_RMCast_IO_UDP::send_leave (ACE_RMCast::Leave &,
+ const ACE_INET_Addr &to)
+{
+ // @@ TODO: We could keep the header pre-initialized, and only
+ // update the portions that do change...
+ char header[16];
+ header[0] = ACE_RMCast::MT_LEAVE;
+
+ // ACE_SOCK_MCast_Dgram disallows sending, but it actually works.
+ ACE_SOCK_Dgram &dgram = this->dgram_;
+
+ if (dgram.send (header, 1, to) == -1)
+ return -1;
+
+ return 0;
+}
+
+
+#if defined (ACE_HAS_EXPLICIT_TEMPLATE_INSTANTIATION)
+
+template class ACE_Hash_Map_Manager<ACE_INET_Addr,ACE_RMCast_UDP_Proxy*,ACE_Null_Mutex>;
+template class ACE_Hash_Map_Manager_Ex<ACE_INET_Addr,ACE_RMCast_UDP_Proxy*,ACE_Hash<ACE_INET_Addr>,ACE_Equal_To<ACE_INET_Addr>,ACE_Null_Mutex>;
+template class ACE_Hash_Map_Iterator<ACE_INET_Addr,ACE_RMCast_UDP_Proxy*,ACE_Null_Mutex>;
+template class ACE_Hash_Map_Iterator_Base_Ex<ACE_INET_Addr,ACE_RMCast_UDP_Proxy*,ACE_Hash<ACE_INET_Addr>,ACE_Equal_To<ACE_INET_Addr>,ACE_Null_Mutex>;
+template class ACE_Hash_Map_Entry<ACE_INET_Addr,ACE_RMCast_UDP_Proxy*>;
+
+#endif /* ACE_HAS_EXPLICIT_TEMPLATE_INSTANTIATION */
diff --git a/protocols/ace/RMCast/RMCast_UDP_Receiver.h b/ace/RMCast/RMCast_IO_UDP.h
index bfc56d89705..bdcccabe6e1 100644
--- a/protocols/ace/RMCast/RMCast_UDP_Receiver.h
+++ b/ace/RMCast/RMCast_IO_UDP.h
@@ -10,35 +10,36 @@
//
// ============================================================================
-#ifndef ACE_RMCAST_UDP_RECEIVER_H
-#define ACE_RMCAST_UDP_RECEIVER_H
+#ifndef ACE_RMCAST_IO_UDP_H
+#define ACE_RMCAST_IO_UDP_H
#include "ace/pre.h"
+#include "RMCast_Module.h"
#include "RMCast_UDP_Event_Handler.h"
#include "ace/SOCK_Dgram_Mcast.h"
#include "ace/Hash_Map_Manager.h"
#include "ace/Synch.h"
+#include "ace/INET_Addr.h"
#if !defined (ACE_LACKS_PRAGMA_ONCE)
# pragma once
#endif /* ACE_LACKS_PRAGMA_ONCE */
-class ACE_RMCast_Sender_Proxy;
-class ACE_RMCast_Sender_Proxy_Factory;
+class ACE_RMCast_UDP_Proxy;
+class ACE_RMCast_Module_Factory;
class ACE_Reactor;
class ACE_Time_Value;
-class ACE_INET_Addr;
-class ACE_RMCast_Export ACE_RMCast_UDP_Receiver
+class ACE_RMCast_Export ACE_RMCast_IO_UDP : public ACE_RMCast_Module
{
public:
- ACE_RMCast_UDP_Receiver (ACE_RMCast_Sender_Proxy_Factory *factory);
+ ACE_RMCast_IO_UDP (ACE_RMCast_Module_Factory *factory);
// Constructor
- // <factory> is used to create the Sender_Proxy and Modules that
- // process incoming messages.
- // The caller owns <factory>.
+ // <factory> is used to create the modules for each proxy that
+ // process incoming messages. The class does *not* assume ownership
+ // of <factory>, the caller owns it.
- ~ACE_RMCast_UDP_Receiver (void);
+ ~ACE_RMCast_IO_UDP (void);
// Destructor
int subscribe (const ACE_INET_Addr &mcast_addr,
@@ -69,25 +70,36 @@ public:
ACE_HANDLE get_handle (void) const;
// Obtain the handle for the underlying socket
-private:
- int send_join (ACE_INET_Addr &from);
- // Send a JOIN message
-
- int send_ack (ACE_RMCast_Sender_Proxy *sender_proxy,
- ACE_INET_Addr &from);
- // Send an ACK message
-
- int send_leave (ACE_INET_Addr &from);
- // Send a LEAVE messsage
+ // Send back to the remove object represented by <proxy>
+ int send_data (ACE_RMCast::Data &, const ACE_INET_Addr &);
+ int send_poll (ACE_RMCast::Poll &, const ACE_INET_Addr &);
+ int send_ack_join (ACE_RMCast::Ack_Join &, const ACE_INET_Addr &);
+ int send_ack_leave (ACE_RMCast::Ack_Leave &, const ACE_INET_Addr &);
+ int send_ack (ACE_RMCast::Ack &, const ACE_INET_Addr &);
+ int send_join (ACE_RMCast::Join &, const ACE_INET_Addr &);
+ int send_leave (ACE_RMCast::Leave &, const ACE_INET_Addr &);
+
+ // = The RMCast_Module methods
+ virtual int data (ACE_RMCast::Data &);
+ virtual int poll (ACE_RMCast::Poll &);
+ virtual int ack_join (ACE_RMCast::Ack_Join &);
+ virtual int ack_leave (ACE_RMCast::Ack_Leave &);
+ virtual int ack (ACE_RMCast::Ack &);
+ virtual int join (ACE_RMCast::Join &);
+ virtual int leave (ACE_RMCast::Leave &);
+ // The messages are sent to the multicast group
private:
- ACE_RMCast_Sender_Proxy_Factory *factory_;
- // The factory used to create Sender proxies
+ ACE_RMCast_Module_Factory *factory_;
+ // The factory used to create the modules attached to each proxy
+
+ ACE_INET_Addr mcast_group_;
+ // The multicast group we subscribe and send to
ACE_SOCK_Dgram_Mcast dgram_;
// The socket
- typedef ACE_Hash_Map_Manager<ACE_INET_Addr,ACE_RMCast_Sender_Proxy*,ACE_Null_Mutex> Map;
+ typedef ACE_Hash_Map_Manager<ACE_INET_Addr,ACE_RMCast_UDP_Proxy*,ACE_Null_Mutex> Map;
Map map_;
ACE_RMCast_UDP_Event_Handler eh_;
@@ -95,8 +107,8 @@ private:
};
#if defined (__ACE_INLINE__)
-#include "RMCast_UDP_Receiver.i"
+#include "RMCast_IO_UDP.i"
#endif /* __ACE_INLINE__ */
#include "ace/post.h"
-#endif /* ACE_RMCAST_UDP_RECEIVER_H */
+#endif /* ACE_RMCAST_IO_UDP_H */
diff --git a/ace/RMCast/RMCast_IO_UDP.i b/ace/RMCast/RMCast_IO_UDP.i
new file mode 100644
index 00000000000..ddacc5694ad
--- /dev/null
+++ b/ace/RMCast/RMCast_IO_UDP.i
@@ -0,0 +1,9 @@
+// $Id$
+
+ACE_INLINE
+ACE_RMCast_IO_UDP::
+ ACE_RMCast_IO_UDP (ACE_RMCast_Module_Factory *factory)
+ : factory_ (factory)
+ , eh_ (this)
+{
+}
diff --git a/ace/RMCast/RMCast_Module.cpp b/ace/RMCast/RMCast_Module.cpp
index b47694abe4d..632d905f900 100644
--- a/ace/RMCast/RMCast_Module.cpp
+++ b/ace/RMCast/RMCast_Module.cpp
@@ -55,3 +55,59 @@ ACE_RMCast_Module::close (void)
{
return 0;
}
+
+int
+ACE_RMCast_Module::data (ACE_RMCast::Data &data)
+{
+ if (this->next () != 0)
+ return this->next ()->data (data);
+ return 0;
+}
+
+int
+ACE_RMCast_Module::poll (ACE_RMCast::Poll &poll)
+{
+ if (this->next () != 0)
+ return this->next ()->poll (poll);
+ return 0;
+}
+
+int
+ACE_RMCast_Module::ack_join (ACE_RMCast::Ack_Join &ack_join)
+{
+ if (this->next () != 0)
+ return this->next ()->ack_join (ack_join);
+ return 0;
+}
+
+int
+ACE_RMCast_Module::ack_leave (ACE_RMCast::Ack_Leave &ack_leave)
+{
+ if (this->next () != 0)
+ return this->next ()->ack_leave (ack_leave);
+ return 0;
+}
+
+int
+ACE_RMCast_Module::ack (ACE_RMCast::Ack &ack)
+{
+ if (this->next () != 0)
+ return this->next ()->ack (ack);
+ return 0;
+}
+
+int
+ACE_RMCast_Module::join (ACE_RMCast::Join &join)
+{
+ if (this->next () != 0)
+ return this->next ()->join (join);
+ return 0;
+}
+
+int
+ACE_RMCast_Module::leave (ACE_RMCast::Leave &leave)
+{
+ if (this->next () != 0)
+ return this->next ()->leave (leave);
+ return 0;
+}
diff --git a/ace/RMCast/RMCast_Module.h b/ace/RMCast/RMCast_Module.h
index 30f3da2f4fe..9f83c2d5be4 100644
--- a/ace/RMCast/RMCast_Module.h
+++ b/ace/RMCast/RMCast_Module.h
@@ -59,7 +59,13 @@ public:
virtual int close (void);
// Close the module.
- virtual int put_data (ACE_RMCast::Data &data) = 0;
+ virtual int data (ACE_RMCast::Data &);
+ virtual int poll (ACE_RMCast::Poll &);
+ virtual int ack_join (ACE_RMCast::Ack_Join &);
+ virtual int ack_leave (ACE_RMCast::Ack_Leave &);
+ virtual int ack (ACE_RMCast::Ack &);
+ virtual int join (ACE_RMCast::Join &);
+ virtual int leave (ACE_RMCast::Leave &);
// Push data down the stack
private:
diff --git a/ace/RMCast/RMCast_Module_Factory.cpp b/ace/RMCast/RMCast_Module_Factory.cpp
new file mode 100644
index 00000000000..b749048a78c
--- /dev/null
+++ b/ace/RMCast/RMCast_Module_Factory.cpp
@@ -0,0 +1,13 @@
+// $Id$
+
+#include "RMCast_Module_Factory.h"
+
+#if !defined (__ACE_INLINE__)
+# include "RMCast_Module_Factory.i"
+#endif /* ! __ACE_INLINE__ */
+
+ACE_RCSID(ace, RMCast_Module_Factory, "$Id$")
+
+ACE_RMCast_Module_Factory::~ACE_RMCast_Module_Factory (void)
+{
+}
diff --git a/ace/RMCast/RMCast_Module_Factory.h b/ace/RMCast/RMCast_Module_Factory.h
new file mode 100644
index 00000000000..722ad87d678
--- /dev/null
+++ b/ace/RMCast/RMCast_Module_Factory.h
@@ -0,0 +1,50 @@
+/* -*- C++ -*- */
+// $Id$
+
+// ============================================================================
+//
+// = LIBRARY
+// ace
+//
+// = FILENAME
+// RMCast_Module_Factory.h
+//
+// = AUTHOR
+// Carlos O'Ryan <coryan@uci.edu>
+//
+// ============================================================================
+
+#ifndef ACE_RMCAST_MODULE_FACTORY_H
+#define ACE_RMCAST_MODULE_FACTORY_H
+#include "ace/pre.h"
+
+#include "RMCast.h"
+
+#if !defined (ACE_LACKS_PRAGMA_ONCE)
+# pragma once
+#endif /* ACE_LACKS_PRAGMA_ONCE */
+
+class ACE_RMCast_Module;
+class ACE_RMCast_IO_UDP;
+
+class ACE_RMCast_Export ACE_RMCast_Module_Factory
+{
+ // = DESCRIPTION
+ //
+public:
+ virtual ~ACE_RMCast_Module_Factory (void);
+ // Destructor
+
+ virtual ACE_RMCast_Module *create (ACE_RMCast_IO_UDP *) = 0;
+ // Create a new proxy
+
+ virtual void destroy (ACE_RMCast_Module *) = 0;
+ // Destroy a proxy
+};
+
+#if defined (__ACE_INLINE__)
+#include "RMCast_Module_Factory.i"
+#endif /* __ACE_INLINE__ */
+
+#include "ace/post.h"
+#endif /* ACE_RMCAST_MODULE_FACTORY_H */
diff --git a/ace/RMCast/RMCast_Sender_Proxy_Best_Effort.i b/ace/RMCast/RMCast_Module_Factory.i
index cfa1da318d3..cfa1da318d3 100644
--- a/ace/RMCast/RMCast_Sender_Proxy_Best_Effort.i
+++ b/ace/RMCast/RMCast_Module_Factory.i
diff --git a/ace/RMCast/RMCast_Reassembly.cpp b/ace/RMCast/RMCast_Reassembly.cpp
index a52791e1ebf..ba2e9b79c1a 100644
--- a/ace/RMCast/RMCast_Reassembly.cpp
+++ b/ace/RMCast/RMCast_Reassembly.cpp
@@ -34,7 +34,7 @@ ACE_RMCast_Reassembly::~ACE_RMCast_Reassembly (void)
}
int
-ACE_RMCast_Reassembly::put_data (ACE_RMCast::Data &data)
+ACE_RMCast_Reassembly::data (ACE_RMCast::Data &data)
{
if (this->next () == 0)
return 0;
@@ -42,7 +42,7 @@ ACE_RMCast_Reassembly::put_data (ACE_RMCast::Data &data)
if (data.payload->length () + data.fragment_offset > data.total_size)
{
ACE_DEBUG ((LM_DEBUG,
- "RMCast_Reassembly::put_data - invalid size\n"));
+ "RMCast_Reassembly::data - invalid size\n"));
return -1; // Corrupt message?
}
@@ -92,7 +92,7 @@ ACE_RMCast_Reassembly::put_data (ACE_RMCast::Data &data)
downstream_data.fragment_offset = 0;
downstream_data.payload = message->message_body ();
- int r = this->next ()->put_data (downstream_data);
+ int r = this->next ()->data (downstream_data);
delete message;
diff --git a/ace/RMCast/RMCast_Reassembly.h b/ace/RMCast/RMCast_Reassembly.h
index 0982d059c7c..0bf0c3a61ee 100644
--- a/ace/RMCast/RMCast_Reassembly.h
+++ b/ace/RMCast/RMCast_Reassembly.h
@@ -34,7 +34,7 @@ public:
// Destructor
// = The ACE_RMCast_Module methods
- virtual int put_data (ACE_RMCast::Data &data);
+ virtual int data (ACE_RMCast::Data &data);
private:
ACE_SYNCH_MUTEX mutex_;
diff --git a/ace/RMCast/RMCast_Sender_Proxy.cpp b/ace/RMCast/RMCast_Sender_Proxy.cpp
deleted file mode 100644
index ff1b7b33f15..00000000000
--- a/ace/RMCast/RMCast_Sender_Proxy.cpp
+++ /dev/null
@@ -1,20 +0,0 @@
-// $Id$
-
-#include "RMCast_Sender_Proxy.h"
-#include "RMCast_Module.h"
-#include "ace/Message_Block.h"
-
-#if !defined (__ACE_INLINE__)
-# include "RMCast_Sender_Proxy.i"
-#endif /* ! __ACE_INLINE__ */
-
-ACE_RCSID(ace, RMCast_Sender_Proxy, "$Id$")
-
-ACE_RMCast_Sender_Proxy::ACE_RMCast_Sender_Proxy (ACE_RMCast_Module *module)
- : module_ (module)
-{
-}
-
-ACE_RMCast_Sender_Proxy::~ACE_RMCast_Sender_Proxy (void)
-{
-}
diff --git a/ace/RMCast/RMCast_Sender_Proxy.i b/ace/RMCast/RMCast_Sender_Proxy.i
deleted file mode 100644
index b47573711ea..00000000000
--- a/ace/RMCast/RMCast_Sender_Proxy.i
+++ /dev/null
@@ -1,7 +0,0 @@
-// $Id$
-
-ACE_INLINE ACE_RMCast_Module *
-ACE_RMCast_Sender_Proxy::module (void) const
-{
- return this->module_;
-}
diff --git a/ace/RMCast/RMCast_Sender_Proxy_Best_Effort.cpp b/ace/RMCast/RMCast_Sender_Proxy_Best_Effort.cpp
deleted file mode 100644
index ba525f245bc..00000000000
--- a/ace/RMCast/RMCast_Sender_Proxy_Best_Effort.cpp
+++ /dev/null
@@ -1,72 +0,0 @@
-// $Id$
-
-#include "RMCast_Sender_Proxy_Best_Effort.h"
-#include "RMCast_Module.h"
-#include "ace/Message_Block.h"
-
-#if !defined (__ACE_INLINE__)
-# include "RMCast_Sender_Proxy_Best_Effort.i"
-#endif /* ! __ACE_INLINE__ */
-
-ACE_RCSID(ace, RMCast_Sender_Proxy_Best_Effort, "$Id$")
-
-ACE_RMCast_Sender_Proxy_Best_Effort::
- ACE_RMCast_Sender_Proxy_Best_Effort (ACE_RMCast_Module *module)
- : ACE_RMCast_Sender_Proxy (module)
-{
-}
-
-ACE_RMCast_Sender_Proxy_Best_Effort::
- ~ACE_RMCast_Sender_Proxy_Best_Effort (void)
-{
-}
-
-int
-ACE_RMCast_Sender_Proxy_Best_Effort::receive_message (char *buffer,
- size_t size)
-{
- int type = buffer[0];
-
- // All control messages are ignored...
- if (type != ACE_RMCast::MT_DATA)
- return 0;
-
- // @@ Push the event through the stack
-#if 0
- ACE_DEBUG ((LM_DEBUG,
- "Proxy(%x) - received data\n", long(this)));
- ACE_HEX_DUMP ((LM_DEBUG, buffer, header, "Proxy"));
-#endif
-
- const size_t header_size = 1 + 3 * sizeof(ACE_UINT32);
- if (size < header_size)
- {
- // The message is too small
- return 0;
- }
-
- ACE_UINT32 tmp;
-
- ACE_RMCast::Data data;
-
- ACE_OS::memcpy (&tmp, buffer + 1,
- sizeof(tmp));
- data.sequence_number = ACE_NTOHL (tmp);
-
- ACE_OS::memcpy (&tmp, buffer + 1 + sizeof(tmp),
- sizeof(tmp));
- data.total_size = ACE_NTOHL (tmp);
-
- ACE_OS::memcpy (&tmp, buffer + 1 + 2 * sizeof(tmp),
- sizeof(tmp));
- data.fragment_offset = ACE_NTOHL (tmp);
-
- // Pass it up the module...
- ACE_Message_Block *mb;
- ACE_NEW_RETURN (mb, ACE_Message_Block, -1);
- mb->size (size - header_size);
- mb->copy (buffer + header_size, size - header_size);
-
- data.payload = mb;
- return this->module ()->put_data (data);
-}
diff --git a/ace/RMCast/RMCast_Sender_Proxy_Best_Effort.h b/ace/RMCast/RMCast_Sender_Proxy_Best_Effort.h
deleted file mode 100644
index 304e026afc3..00000000000
--- a/ace/RMCast/RMCast_Sender_Proxy_Best_Effort.h
+++ /dev/null
@@ -1,53 +0,0 @@
-/* -*- C++ -*- */
-// $Id$
-
-// ============================================================================
-//
-// = LIBRARY
-// ace
-//
-// = FILENAME
-// RMCast_Sender_Proxy_Best_Effort.h
-//
-// = AUTHOR
-// Carlos O'Ryan <coryan@uci.edu>
-//
-// ============================================================================
-
-#ifndef ACE_RMCAST_SENDER_PROXY_BEST_EFFORT_H
-#define ACE_RMCAST_SENDER_PROXY_BEST_EFFORT_H
-#include "ace/pre.h"
-
-#include "RMCast_Sender_Proxy.h"
-
-#if !defined (ACE_LACKS_PRAGMA_ONCE)
-# pragma once
-#endif /* ACE_LACKS_PRAGMA_ONCE */
-
-class ACE_RMCast_Module;
-
-class ACE_RMCast_Export ACE_RMCast_Sender_Proxy_Best_Effort : public ACE_RMCast_Sender_Proxy
-{
- // = TITLE
- // Reliable Multicast Sender Ambassador
- //
- // = DESCRIPTION
- // Implement an Ambassador for the reliable multicast senders.
- //
-public:
- ACE_RMCast_Sender_Proxy_Best_Effort (ACE_RMCast_Module *module);
- // Constructor
-
- ~ACE_RMCast_Sender_Proxy_Best_Effort (void);
- // Destructor
-
- virtual int receive_message (char *buffer, size_t size);
- // A DATA message was received.
-};
-
-#if defined (__ACE_INLINE__)
-#include "RMCast_Sender_Proxy_Best_Effort.i"
-#endif /* __ACE_INLINE__ */
-
-#include "ace/post.h"
-#endif /* ACE_RMCAST_SENDER_PROXY_BEST_EFFORT_H */
diff --git a/ace/RMCast/RMCast_Sender_Proxy_Factory.cpp b/ace/RMCast/RMCast_Sender_Proxy_Factory.cpp
deleted file mode 100644
index 48a82b5dfbc..00000000000
--- a/ace/RMCast/RMCast_Sender_Proxy_Factory.cpp
+++ /dev/null
@@ -1,13 +0,0 @@
-// $Id$
-
-#include "RMCast_Sender_Proxy_Factory.h"
-
-#if !defined (__ACE_INLINE__)
-# include "RMCast_Sender_Proxy_Factory.i"
-#endif /* ! __ACE_INLINE__ */
-
-ACE_RCSID(ace, RMCast_Sender_Proxy_Factory, "$Id$")
-
-ACE_RMCast_Sender_Proxy_Factory::~ACE_RMCast_Sender_Proxy_Factory (void)
-{
-}
diff --git a/ace/RMCast/RMCast_Sender_Proxy_Factory.h b/ace/RMCast/RMCast_Sender_Proxy_Factory.h
deleted file mode 100644
index 7dff4d2796f..00000000000
--- a/ace/RMCast/RMCast_Sender_Proxy_Factory.h
+++ /dev/null
@@ -1,55 +0,0 @@
-/* -*- C++ -*- */
-// $Id$
-
-// ============================================================================
-//
-// = LIBRARY
-// ace
-//
-// = FILENAME
-// RMCast_Sender_Proxy_Factory.h
-//
-// = AUTHOR
-// Carlos O'Ryan <coryan@uci.edu>
-//
-// ============================================================================
-
-#ifndef ACE_RMCAST_SENDER_PROXY_FACTORY_H
-#define ACE_RMCAST_SENDER_PROXY_FACTORY_H
-#include "ace/pre.h"
-
-#include "RMCast.h"
-
-#if !defined (ACE_LACKS_PRAGMA_ONCE)
-# pragma once
-#endif /* ACE_LACKS_PRAGMA_ONCE */
-
-class ACE_RMCast_Sender_Proxy;
-
-class ACE_RMCast_Export ACE_RMCast_Sender_Proxy_Factory
-{
- // = DESCRIPTION
- // Defines the interface to create Sender_Proxies.
- // The application provides a Sender_Proxy_Factory, this is used
- // by the receiver side to create a different proxy for each
- // remote sender. The application configures the proxy with the
- // correct modules to process incoming events and achieve the
- // desired level of reliability.
- //
-public:
- virtual ~ACE_RMCast_Sender_Proxy_Factory (void);
- // Destructor
-
- virtual ACE_RMCast_Sender_Proxy *create (void) = 0;
- // Create a new proxy
-
- virtual void destroy (ACE_RMCast_Sender_Proxy *) = 0;
- // Destroy a proxy
-};
-
-#if defined (__ACE_INLINE__)
-#include "RMCast_Sender_Proxy_Factory.i"
-#endif /* __ACE_INLINE__ */
-
-#include "ace/post.h"
-#endif /* ACE_RMCAST_SENDER_PROXY_FACTORY_H */
diff --git a/ace/RMCast/RMCast_UDP_Event_Handler.cpp b/ace/RMCast/RMCast_UDP_Event_Handler.cpp
index 69cfc337113..e5ff8da2761 100644
--- a/ace/RMCast/RMCast_UDP_Event_Handler.cpp
+++ b/ace/RMCast/RMCast_UDP_Event_Handler.cpp
@@ -3,7 +3,7 @@
//
#include "RMCast_UDP_Event_Handler.h"
-#include "RMCast_UDP_Receiver.h"
+#include "RMCast_IO_UDP.h"
#if !defined (__ACE_INLINE__)
# include "RMCast_UDP_Event_Handler.i"
@@ -18,19 +18,19 @@ ACE_RMCast_UDP_Event_Handler::~ACE_RMCast_UDP_Event_Handler (void)
ACE_HANDLE
ACE_RMCast_UDP_Event_Handler::get_handle (void) const
{
- return this->receiver_->get_handle ();
+ return this->io_udp_->get_handle ();
}
int
ACE_RMCast_UDP_Event_Handler::handle_input (ACE_HANDLE h)
{
- return this->receiver_->handle_input (h);
+ return this->io_udp_->handle_input (h);
}
int
ACE_RMCast_UDP_Event_Handler::handle_timeout (const ACE_Time_Value &,
const void *)
{
- // @@ return this->receiver_->handle_timeout ();
+ // @@ return this->io_udp_->handle_timeout ();
return 0;
}
diff --git a/ace/RMCast/RMCast_UDP_Event_Handler.h b/ace/RMCast/RMCast_UDP_Event_Handler.h
index 193d7038cd8..02798cee7f8 100644
--- a/ace/RMCast/RMCast_UDP_Event_Handler.h
+++ b/ace/RMCast/RMCast_UDP_Event_Handler.h
@@ -4,7 +4,7 @@
//
// = DESCRIPTION
// Implement an adapter between the ACE Reactor and the
-// ACE_RMCast_UDP_Receiver
+// ACE_RMCast_IO_UDP
//
// = AUTHOR
// Carlos O'Ryan <coryan@uci.edu>
@@ -22,13 +22,13 @@
# pragma once
#endif /* ACE_LACKS_PRAGMA_ONCE */
-class ACE_RMCast_UDP_Receiver;
+class ACE_RMCast_IO_UDP;
class ACE_INET_Addr;
class ACE_RMCast_Export ACE_RMCast_UDP_Event_Handler : public ACE_Event_Handler
{
public:
- ACE_RMCast_UDP_Event_Handler (ACE_RMCast_UDP_Receiver *receiver);
+ ACE_RMCast_UDP_Event_Handler (ACE_RMCast_IO_UDP *receiver);
// Constructor
~ACE_RMCast_UDP_Event_Handler (void);
@@ -41,7 +41,7 @@ public:
const void *act = 0);
private:
- ACE_RMCast_UDP_Receiver *receiver_;
+ ACE_RMCast_IO_UDP *io_udp_;
// The sender
};
diff --git a/ace/RMCast/RMCast_UDP_Event_Handler.i b/ace/RMCast/RMCast_UDP_Event_Handler.i
index b35aeefa3f4..99b4c0ac7e5 100644
--- a/ace/RMCast/RMCast_UDP_Event_Handler.i
+++ b/ace/RMCast/RMCast_UDP_Event_Handler.i
@@ -2,8 +2,7 @@
ACE_INLINE
ACE_RMCast_UDP_Event_Handler::
-ACE_RMCast_UDP_Event_Handler (ACE_RMCast_UDP_Receiver *receiver)
- : receiver_ (receiver)
+ACE_RMCast_UDP_Event_Handler (ACE_RMCast_IO_UDP *io)
+ : io_udp_ (io)
{
}
-
diff --git a/ace/RMCast/RMCast_UDP_Proxy.cpp b/ace/RMCast/RMCast_UDP_Proxy.cpp
new file mode 100644
index 00000000000..1fbad27f2cd
--- /dev/null
+++ b/ace/RMCast/RMCast_UDP_Proxy.cpp
@@ -0,0 +1,136 @@
+// $Id$
+
+#include "RMCast_UDP_Proxy.h"
+#include "RMCast_Module.h"
+#include "ace/Message_Block.h"
+
+#if !defined (__ACE_INLINE__)
+# include "RMCast_UDP_Proxy.i"
+#endif /* ! __ACE_INLINE__ */
+
+ACE_RCSID(ace, RMCast_UDP_Proxy, "$Id$")
+
+ACE_RMCast_UDP_Proxy::ACE_RMCast_UDP_Proxy (ACE_RMCast_IO_UDP *io_udp,
+ const ACE_INET_Addr &addr,
+ ACE_RMCast_Module *module)
+ : io_udp_ (io_udp)
+ , peer_addr_ (addr)
+ , module_ (module)
+{
+}
+
+ACE_RMCast_UDP_Proxy::~ACE_RMCast_UDP_Proxy (void)
+{
+}
+
+int
+ACE_RMCast_UDP_Proxy::receive_message (char *buffer, size_t size)
+{
+ int type = buffer[0];
+
+ // @@ What should we do with invalid messages like this?
+ //
+ if (type < 0 || type >= ACE_RMCast::MT_LAST)
+ return 0;
+
+ if (type == ACE_RMCast::MT_POLL)
+ {
+ ACE_RMCast::Poll poll;
+ return this->module ()->poll (poll);
+ }
+
+ else if (type == ACE_RMCast::MT_ACK_JOIN)
+ {
+ ACE_RMCast::Ack_Join ack_join;
+ const size_t header_size = 1 + sizeof(ACE_UINT32);
+ if (size < header_size)
+ {
+ // The message is too small
+ return 0;
+ }
+
+ ACE_UINT32 tmp;
+
+ ACE_OS::memcpy (&tmp, buffer + 1,
+ sizeof(tmp));
+ ack_join.next_sequence_number = ACE_NTOHL (tmp);
+ return this->module ()->ack_join (ack_join);
+ }
+
+ else if (type == ACE_RMCast::MT_ACK_LEAVE)
+ {
+ ACE_RMCast::Ack_Leave ack_leave;
+ return this->module ()->ack_leave (ack_leave);
+ }
+
+ else if (type == ACE_RMCast::MT_DATA)
+ {
+ ACE_RMCast::Data data;
+ const size_t header_size = 1 + 3 * sizeof(ACE_UINT32);
+ if (size < header_size)
+ {
+ // The message is too small
+ return 0;
+ }
+
+ ACE_UINT32 tmp;
+
+ ACE_OS::memcpy (&tmp, buffer + 1,
+ sizeof(tmp));
+ data.sequence_number = ACE_NTOHL (tmp);
+
+ ACE_OS::memcpy (&tmp, buffer + 1 + sizeof(tmp),
+ sizeof(tmp));
+ data.total_size = ACE_NTOHL (tmp);
+
+ ACE_OS::memcpy (&tmp, buffer + 1 + 2 * sizeof(tmp),
+ sizeof(tmp));
+ data.fragment_offset = ACE_NTOHL (tmp);
+
+ // Pass it up the module...
+ ACE_Message_Block *mb;
+ ACE_NEW_RETURN (mb, ACE_Message_Block, -1);
+ mb->size (size - header_size);
+ mb->copy (buffer + header_size, size - header_size);
+
+ data.payload = mb;
+ return this->module ()->data (data);
+ }
+
+ else if (type == ACE_RMCast::MT_JOIN)
+ {
+ ACE_RMCast::Join join;
+ return this->module ()->join (join);
+ }
+
+ else if (type == ACE_RMCast::MT_LEAVE)
+ {
+ ACE_RMCast::Leave leave;
+ return this->module ()->leave (leave);
+ }
+
+ else if (type == ACE_RMCast::MT_ACK)
+ {
+ ACE_RMCast::Ack ack;
+
+ const size_t header_size = 1 + sizeof(ACE_UINT32);
+ if (size < header_size)
+ {
+ // The message is too small
+ return 0;
+ }
+
+ ACE_UINT32 tmp;
+
+ ACE_OS::memcpy (&tmp, buffer + 1,
+ sizeof(tmp));
+ ack.highest_in_sequence = ACE_NTOHL (tmp);
+ ACE_OS::memcpy (&tmp, buffer + 1 + sizeof(ACE_UINT32),
+ sizeof(tmp));
+ ack.highest_received = ACE_NTOHL (tmp);
+
+ return this->module ()->ack (ack);
+ }
+
+ return 0;
+}
diff --git a/ace/RMCast/RMCast_Sender_Proxy.h b/ace/RMCast/RMCast_UDP_Proxy.h
index c6b51f78b48..aa7e08c65be 100644
--- a/ace/RMCast/RMCast_Sender_Proxy.h
+++ b/ace/RMCast/RMCast_UDP_Proxy.h
@@ -7,26 +7,28 @@
// ace
//
// = FILENAME
-// RMCast_Sender_Proxy.h
+// RMCast_UDP_Proxy.h
//
// = AUTHOR
// Carlos O'Ryan <coryan@uci.edu>
//
// ============================================================================
-#ifndef ACE_RMCAST_SENDER_PROXY_H
-#define ACE_RMCAST_SENDER_PROXY_H
+#ifndef ACE_RMCAST_UDP_PROXY_H
+#define ACE_RMCAST_UDP_PROXY_H
#include "ace/pre.h"
#include "RMCast.h"
+#include "ace/INET_Addr.h"
#if !defined (ACE_LACKS_PRAGMA_ONCE)
# pragma once
#endif /* ACE_LACKS_PRAGMA_ONCE */
class ACE_RMCast_Module;
+class ACE_RMCast_IO_UDP;
-class ACE_RMCast_Export ACE_RMCast_Sender_Proxy
+class ACE_RMCast_Export ACE_RMCast_UDP_Proxy
{
// = TITLE
// Reliable Multicast Sender Ambassador
@@ -35,27 +37,37 @@ class ACE_RMCast_Export ACE_RMCast_Sender_Proxy
// Implement an Ambassador for the reliable multicast senders.
//
public:
- ACE_RMCast_Sender_Proxy (ACE_RMCast_Module *module);
+ ACE_RMCast_UDP_Proxy (ACE_RMCast_IO_UDP *io_udp,
+ const ACE_INET_Addr &peer_addr,
+ ACE_RMCast_Module *module);
// Constructor
- virtual ~ACE_RMCast_Sender_Proxy (void);
+ virtual ~ACE_RMCast_UDP_Proxy (void);
// Destructor
- ACE_RMCast_Module *module (void) const;
- // Return the internal module
+ int receive_message (char *buffer, size_t size);
+ // Receive the message
+
+ const ACE_INET_Addr &peer_addr (void) const;
+ // The address of the peer
- virtual int receive_message (char *buffer, size_t size) = 0;
- // A new message has been received, process it
+ ACE_RMCast_Module *module (void) const;
+ // The module
private:
+ ACE_RMCast_IO_UDP *io_udp_;
+ // The IO facade
+
+ ACE_INET_Addr peer_addr_;
+ // The address of the peer
+
ACE_RMCast_Module *module_;
- // Process the data, control messages are processed by the Sender
- // proxy
+ // Process the data and control messages.
};
#if defined (__ACE_INLINE__)
-#include "RMCast_Sender_Proxy.i"
+#include "RMCast_UDP_Proxy.i"
#endif /* __ACE_INLINE__ */
#include "ace/post.h"
-#endif /* ACE_RMCAST_SENDER_PROXY_H */
+#endif /* ACE_RMCAST_UDP_PROXY_H */
diff --git a/ace/RMCast/RMCast_UDP_Proxy.i b/ace/RMCast/RMCast_UDP_Proxy.i
new file mode 100644
index 00000000000..8ef6142ed7c
--- /dev/null
+++ b/ace/RMCast/RMCast_UDP_Proxy.i
@@ -0,0 +1,13 @@
+// $Id$
+
+ACE_INLINE const ACE_INET_Addr&
+ACE_RMCast_UDP_Proxy::peer_addr (void) const
+{
+ return this->peer_addr_;
+}
+
+ACE_INLINE ACE_RMCast_Module *
+ACE_RMCast_UDP_Proxy::module (void) const
+{
+ return this->module_;
+}
diff --git a/ace/RMCast/RMCast_UDP_Receiver.cpp b/ace/RMCast/RMCast_UDP_Receiver.cpp
deleted file mode 100644
index eeb03f50bcf..00000000000
--- a/ace/RMCast/RMCast_UDP_Receiver.cpp
+++ /dev/null
@@ -1,241 +0,0 @@
-//
-// $Id$
-//
-
-#include "RMCast_UDP_Receiver.h"
-#include "RMCast_Sender_Proxy.h"
-#include "RMCast_Sender_Proxy_Factory.h"
-#include "ace/Handle_Set.h"
-#include "ace/Reactor.h"
-
-#if !defined (__ACE_INLINE__)
-# include "RMCast_UDP_Receiver.i"
-#endif /* ! __ACE_INLINE__ */
-
-ACE_RCSID(ace, RMCast_Receiver, "$Id$")
-
-ACE_RMCast_UDP_Receiver::~ACE_RMCast_UDP_Receiver (void)
-{
-}
-
-int
-ACE_RMCast_UDP_Receiver::subscribe (const ACE_INET_Addr &mcast_addr,
- int reuse_addr,
- const ACE_TCHAR *net_if,
- int protocol_family,
- int protocol)
-{
- return this->dgram_.subscribe (mcast_addr,
- reuse_addr,
- net_if,
- protocol_family,
- protocol);
-}
-
-int
-ACE_RMCast_UDP_Receiver::handle_events (ACE_Time_Value *tv)
-{
- ACE_HANDLE h = this->dgram_.get_handle ();
- if (h == ACE_INVALID_HANDLE)
- return -1;
-
- ACE_Handle_Set handle_set;
- handle_set.set_bit (h);
-
- ACE_Countdown_Time countdown (tv);
-
- int r = ACE_OS::select (int(h) + 1,
- handle_set, 0, 0,
- tv);
- if (r == -1)
- {
- if (errno == EINTR)
- return 0;
- else
- return -1;
- }
- else if (r == 0)
- {
- return 0;
- }
-
- return this->handle_input (h);
-}
-
-int
-ACE_RMCast_UDP_Receiver::register_handlers (ACE_Reactor *reactor)
-{
- this->eh_.reactor (reactor);
- return reactor->register_handler (&this->eh_,
- ACE_Event_Handler::READ_MASK);
-}
-
-int
-ACE_RMCast_UDP_Receiver::remove_handlers (void)
-{
- ACE_Reactor *r = this->eh_.reactor ();
- if (r != 0)
- {
- r->remove_handler (&this->eh_,
- ACE_Event_Handler::ALL_EVENTS_MASK
- | ACE_Event_Handler::DONT_CALL);
- this->eh_.reactor (0);
- }
- return 0;
-}
-
-int
-ACE_RMCast_UDP_Receiver::handle_input (ACE_HANDLE)
-{
- // @@ We should use a system constant instead of this literal
- const int max_udp_packet_size = 65536;
- char buffer[max_udp_packet_size];
-
- ACE_INET_Addr from_address;
- ssize_t r =
- this->dgram_.recv (buffer, sizeof(buffer), from_address);
-
- if (r == -1)
- {
- // @@ LOG??
- ACE_DEBUG ((LM_DEBUG,
- "RMCast_UDP_Receiver::handle_input () - "
- "error in recv\n"));
- return -1;
- }
-
- // ACE_HEX_DUMP ((LM_DEBUG, buffer, 16, "Receiver::handle_input"));
-
- // @@ Locking!
-
- int type = buffer[0];
-
- ACE_RMCast_Sender_Proxy *sender_proxy;
- if (this->map_.find (from_address, sender_proxy) != 0)
- {
- // State == RS_NON_EXISTENT
-
- if (type == ACE_RMCast::MT_ACK
- || type == ACE_RMCast::MT_JOIN
- || type == ACE_RMCast::MT_LEAVE
- || type == ACE_RMCast::MT_ACK_LEAVE)
- {
- // All these message types indicate a problem, the should be
- // generated by receivers, not received by them.
- return 0;
- }
-
- // The message type is valid, we must create a new proxy,
- // initially in the JOINING state...
- sender_proxy =
- this->factory_->create ();
- if (sender_proxy == 0)
- {
- // @@ LOG??
- return 0;
- }
- if (this->map_.bind (from_address, sender_proxy) != 0)
- {
- // @@ LOG??
- return 0;
- }
-
- // Send back a JOIN message...
- return sender_proxy->receive_message (buffer, r);
- }
-
- if (type == ACE_RMCast::MT_ACK
- || type == ACE_RMCast::MT_JOIN
- || type == ACE_RMCast::MT_LEAVE
- || type == ACE_RMCast::MT_ACK_LEAVE
- || type < 0
- || type >= ACE_RMCast::MT_LAST)
- {
- // In this case the message is invalid, but the proxy is already
- // in the table, must destroy it because there was a violation
- // in the protocol....
-
- this->factory_->destroy (sender_proxy);
- this->map_.unbind (from_address);
- return 0;
- }
-
- return sender_proxy->receive_message (buffer, r);
-}
-
-ACE_HANDLE
-ACE_RMCast_UDP_Receiver::get_handle (void) const
-{
- return this->dgram_.get_handle ();
-}
-
-#if 0
-int
-ACE_RMCast_UDP_Receiver::send_join (ACE_INET_Addr &from)
-{
- char buffer[16];
- buffer[0] = ACE_RMCast::MT_JOIN;
-
- ACE_SOCK_Dgram &dgram = this->dgram_;
- ssize_t r = dgram.send (buffer, 1, from);
-
- if (r == -1)
- return -1;
-
- return 0;
-}
-
-int
-ACE_RMCast_UDP_Receiver::send_ack (ACE_RMCast_Sender_Proxy *sender_proxy,
- ACE_INET_Addr &from)
-{
- char buffer[16];
- buffer[0] = ACE_RMCast::MT_ACK;
-
- ACE_UINT32 expected = sender_proxy->expected ();
- expected = ACE_HTONL (expected);
-
- ACE_UINT32 last_received = sender_proxy->last_received ();
- last_received = ACE_HTONL (last_received);
-
- ACE_OS::memcpy (buffer + 1, &expected, sizeof(expected));
- ACE_OS::memcpy (buffer + 1 + sizeof(expected), &last_received,
- sizeof(last_received));
-
- ACE_SOCK_Dgram &dgram = this->dgram_;
- ssize_t r = dgram.send (buffer,
- + sizeof(expected)
- + sizeof(last_received),
- from);
-
- if (r == -1)
- return -1;
-
- return 0;
-}
-
-int
-ACE_RMCast_UDP_Receiver::send_leave (ACE_INET_Addr &from)
-{
- char buffer[16];
- buffer[0] = ACE_RMCast::MT_LEAVE;
-
- ACE_SOCK_Dgram &dgram = this->dgram_;
- ssize_t r = dgram.send (buffer, 1, from);
-
- if (r == -1)
- return -1;
-
- return 0;
-}
-#endif /* 0 */
-
-#if defined (ACE_HAS_EXPLICIT_TEMPLATE_INSTANTIATION)
-
-template class ACE_Hash_Map_Manager<ACE_INET_Addr,ACE_RMCast_Sender_Proxy*,ACE_Null_Mutex>;
-template class ACE_Hash_Map_Manager_Ex<ACE_INET_Addr,ACE_RMCast_Sender_Proxy*,ACE_Hash<ACE_INET_Addr>,ACE_Equal_To<ACE_INET_Addr>,ACE_Null_Mutex>;
-template class ACE_Hash_Map_Iterator<ACE_INET_Addr,ACE_RMCast_Sender_Proxy*,ACE_Null_Mutex>;
-template class ACE_Hash_Map_Iterator_Base_Ex<ACE_INET_Addr,ACE_RMCast_Sender_Proxy*,ACE_Hash<ACE_INET_Addr>,ACE_Equal_To<ACE_INET_Addr>,ACE_Null_Mutex>;
-template class ACE_Hash_Map_Entry<ACE_INET_Addr,ACE_RMCast_Sender_Proxy*>;
-
-#endif /* ACE_HAS_EXPLICIT_TEMPLATE_INSTANTIATION */
diff --git a/ace/RMCast/RMCast_UDP_Receiver.i b/ace/RMCast/RMCast_UDP_Receiver.i
deleted file mode 100644
index 81aeb8e2752..00000000000
--- a/ace/RMCast/RMCast_UDP_Receiver.i
+++ /dev/null
@@ -1,9 +0,0 @@
-// $Id$
-
-ACE_INLINE
-ACE_RMCast_UDP_Receiver::
- ACE_RMCast_UDP_Receiver (ACE_RMCast_Sender_Proxy_Factory *factory)
- : factory_ (factory)
- , eh_ (this)
-{
-}
diff --git a/ace/RMCast/RMCast_UDP_Sender.cpp b/ace/RMCast/RMCast_UDP_Sender.cpp
deleted file mode 100644
index c02ad6fb9cf..00000000000
--- a/ace/RMCast/RMCast_UDP_Sender.cpp
+++ /dev/null
@@ -1,91 +0,0 @@
-//
-// $Id$
-//
-
-#include "RMCast_UDP_Sender.h"
-#include "RMCast.h"
-#include "ace/Message_Block.h"
-
-#if !defined (__ACE_INLINE__)
-# include "RMCast_UDP_Sender.i"
-#endif /* ! __ACE_INLINE__ */
-
-ACE_RCSID(ace, RMCast_UDP_Sender, "$Id$")
-
-ACE_RMCast_UDP_Sender::ACE_RMCast_UDP_Sender (const ACE_INET_Addr &mcast_addr)
- : ACE_RMCast_Module ()
- , mcast_addr_ (mcast_addr)
-{
-}
-
-ACE_RMCast_UDP_Sender::~ACE_RMCast_UDP_Sender (void)
-{
-}
-
-int
-ACE_RMCast_UDP_Sender::open (void)
-{
- return this->dgram_.open (ACE_Addr::sap_any);
-}
-
-int
-ACE_RMCast_UDP_Sender::close (void)
-{
- return this->dgram_.close ();
-}
-
-int
-ACE_RMCast_UDP_Sender::put_data (ACE_RMCast::Data &data)
-{
- // The first message block contains the header
- // @@ TODO: We could keep the header pre-initialized, and only
- // update the portions that do change...
- ACE_UINT32 tmp;
- char header[1 + 3 * sizeof(ACE_UINT32)];
- header[0] = ACE_RMCast::MT_DATA;
-
- tmp = ACE_HTONL (data.sequence_number);
- ACE_OS::memcpy (header + 1,
- &tmp, sizeof(ACE_UINT32));
- tmp = ACE_HTONL (data.total_size);
- ACE_OS::memcpy (header + 1 + sizeof(ACE_UINT32),
- &tmp, sizeof(ACE_UINT32));
- tmp = ACE_HTONL (data.fragment_offset);
- ACE_OS::memcpy (header + 1 + 2 * sizeof(ACE_UINT32),
- &tmp, sizeof(ACE_UINT32));
-
- iovec iov[IOV_MAX];
- int iovcnt = 1;
-
- iov[0].iov_base = header;
- iov[0].iov_len = sizeof(header);
-
- ACE_Message_Block *mb = data.payload;
-
- for (const ACE_Message_Block *i = mb; i != 0; i = i->cont ())
- {
- iov[iovcnt].iov_base = i->rd_ptr ();
- iov[iovcnt].iov_len = i->length ();
- iovcnt++;
- if (iovcnt >= IOV_MAX)
- return -1;
- }
-
- ACE_Time_Value tv (0, 10000);
- ACE_OS::sleep (tv);
- if (this->dgram_.send (iov, iovcnt,
- this->mcast_addr_) == -1)
- return -1;
-
-#if 0
- ACE_HEX_DUMP ((LM_DEBUG,
- (char*)iov[0].iov_base, iov[0].iov_len, "Sending"));
-#endif
-
- return 0;
-}
-
-
-#if defined (ACE_HAS_EXPLICIT_TEMPLATE_INSTANTIATION)
-
-#endif /* ACE_HAS_EXPLICIT_TEMPLATE_INSTANTIATION */
diff --git a/ace/RMCast/RMCast_UDP_Sender.h b/ace/RMCast/RMCast_UDP_Sender.h
deleted file mode 100644
index 474ebbc7f27..00000000000
--- a/ace/RMCast/RMCast_UDP_Sender.h
+++ /dev/null
@@ -1,70 +0,0 @@
-/* -*- C++ -*- */
-// $Id$
-
-// ============================================================================
-//
-// = LIBRARY
-// ace
-//
-// = FILENAME
-// RMCast_UDP_Sender.h
-//
-// = AUTHOR
-// Carlos O'Ryan <coryan@uci.edu>
-//
-// ============================================================================
-
-#ifndef ACE_RMCAST_UDP_SENDER_H
-#define ACE_RMCAST_UDP_SENDER_H
-#include "ace/pre.h"
-
-#include "RMCast_Module.h"
-#include "ace/SOCK_Dgram.h"
-
-#if !defined (ACE_LACKS_PRAGMA_ONCE)
-# pragma once
-#endif /* ACE_LACKS_PRAGMA_ONCE */
-
-class ACE_RMCast_Export ACE_RMCast_UDP_Sender : public ACE_RMCast_Module
-{
- // = TITLE
- // Reliable Multicast UDP_Sender
- //
- // = DESCRIPTION
- // Implements a Facade to the classes that implement a reliable
- // multicast protocol.
-public:
- // = Initialization and termination methods.
- ACE_RMCast_UDP_Sender (const ACE_INET_Addr &mcast_addr);
- // Constructor
-
- virtual ~ACE_RMCast_UDP_Sender (void);
- // Destructor
-
- // = The RMCast_Module methods
- virtual int open (void);
- virtual int close (void);
- virtual int put_data (ACE_RMCast::Data &data);
- // Send the Message block, this is the callback invoked at the end
- // of the stack.
-
-protected:
- ACE_SOCK_Dgram dgram_;
- // This is the socket used to send the multicast data.
- // @@ This should be strategized, what if we want to use something
- // like ATM networks to send the data, then the types would be
- // different....
-
- ACE_INET_Addr mcast_addr_;
- // The multicast group we send to.
- // @@ Can we really strategize the addressing, without introducing
- // too much complexity? How can we decouple the reliability aspect
- // from the transport aspects of the system???
-};
-
-#if defined (__ACE_INLINE__)
-#include "RMCast_UDP_Sender.i"
-#endif /* __ACE_INLINE__ */
-
-#include "ace/post.h"
-#endif /* ACE_RMCAST_UDP_SENDER_H */
diff --git a/ace/RMCast/RMCast_UDP_Sender.i b/ace/RMCast/RMCast_UDP_Sender.i
deleted file mode 100644
index cfa1da318d3..00000000000
--- a/ace/RMCast/RMCast_UDP_Sender.i
+++ /dev/null
@@ -1 +0,0 @@
-// $Id$
diff --git a/protocols/ace/RMCast/README b/protocols/ace/RMCast/README
new file mode 100644
index 00000000000..2dd0c5d9cfc
--- /dev/null
+++ b/protocols/ace/RMCast/README
@@ -0,0 +1,57 @@
+# $Id$
+
+ This directory will contain a simple, small-scale reliable
+multicast framework for ACE. The framework is based on the ASX
+components of the ACE library: the protocol is implemented as a stack
+of interchangeable "modules", each one in charge of a very small task.
+For example, one module implements fragmentation and reassembly, other
+modules implement retransmission, send ACK and NAK messages, and
+maintain receiver membership.
+
+ The modules are replaced to achieve different levels of
+reliability. For example, the retransmission module can be either the
+"Best_Effort", "Semi_Reliable" or "Reliable" implementation. In the
+first case no retransmissions are performed, but lost messages are
+detected and reported to the receiver. The "Semi_Reliable" case
+messages are held for a pre-specified amount of time, and
+re-transmited if requested, but it is possible to loose some messages
+if multiple re-transmissions fail. As in the "Best_Effort" case the
+lost messages are detected and flagged to the application. Finally
+in the "Reliable" mode the senders are flowed controlled until enough
+messages are successfully transmitted.
+
+ In general the stack looks like this:
+
+
+SENDER:
+
+----------------------------------------------------------------
+Buffering : Save lost messages
+Retransmission : Retransmit
+----------------------------------------------------------------
+Fragmentation : Fragment messages in smaller chunks
+Reassembly : and ensure that the IOVMAX limit is not
+ : reached
+----------------------------------------------------------------
+Tranport : Encapsulate the specific transport media
+ : such as TCP/IP, ATM, or shared memory
+ : Demuxes incoming data to the right chain
+ : Change control messages and data messages
+ : to the right dynamic types.
+----------------------------------------------------------------
+
+RECEIVER:
+
+----------------------------------------------------------------
+Lost detection : Detect lost messages and send control
+ : messages back
+----------------------------------------------------------------
+Reassembly : Reassemble messages, fragment control
+Fragmentation : data
+----------------------------------------------------------------
+Transport : Group membership, ACT reception,
+ : handle keep-alive messages...
+----------------------------------------------------------------
+
+
+@@ TODO: Piggybacking...
diff --git a/protocols/ace/RMCast/RMCast.dsp b/protocols/ace/RMCast/RMCast.dsp
index 53fdbf5cb9c..e3a84679364 100644
--- a/protocols/ace/RMCast/RMCast.dsp
+++ b/protocols/ace/RMCast/RMCast.dsp
@@ -94,14 +94,50 @@ LINK32=link.exe
# PROP Default_Filter "cpp;c;cxx;rc;def;r;odl;idl;hpj;bat"
# Begin Source File
+SOURCE=.\RMCast.cpp
+# End Source File
+# Begin Source File
+
+SOURCE=.\RMCast_Fragment.cpp
+# End Source File
+# Begin Source File
+
+SOURCE=.\RMCast_IO_UDP.cpp
+# End Source File
+# Begin Source File
+
+SOURCE=.\RMCast_Module.cpp
+# End Source File
+# Begin Source File
+
+SOURCE=.\RMCast_Module_Factory.cpp
+# End Source File
+# Begin Source File
+
SOURCE=.\RMCast_Partial_Message.cpp
# End Source File
+# Begin Source File
+
+SOURCE=.\RMCast_Reassembly.cpp
+# End Source File
+# Begin Source File
+
+SOURCE=.\RMCast_UDP_Event_Handler.cpp
+# End Source File
+# Begin Source File
+
+SOURCE=.\RMCast_UDP_Proxy.cpp
+# End Source File
# End Group
# Begin Group "Header Files"
# PROP Default_Filter "h;hpp;hxx;hm;inl"
# Begin Source File
+SOURCE=.\RMCast.h
+# End Source File
+# Begin Source File
+
SOURCE=.\RMCast_Export.h
# End Source File
# Begin Source File
@@ -110,42 +146,76 @@ SOURCE=.\RMCast_Fragment.h
# End Source File
# Begin Source File
+SOURCE=.\RMCast_IO_UDP.h
+# End Source File
+# Begin Source File
+
+SOURCE=.\RMCast_Module.h
+# End Source File
+# Begin Source File
+
+SOURCE=.\RMCast_Module_Factory.h
+# End Source File
+# Begin Source File
+
SOURCE=.\RMCast_Partial_Message.h
# End Source File
# Begin Source File
SOURCE=.\RMCast_Reassembly.h
# End Source File
+# Begin Source File
+
+SOURCE=.\RMCast_UDP_Event_Handler.h
+# End Source File
+# Begin Source File
+
+SOURCE=.\RMCast_UDP_Proxy.h
+# End Source File
# End Group
# Begin Group "Inline Files"
# PROP Default_Filter "i"
# Begin Source File
+SOURCE=.\RMCast.i
+# End Source File
+# Begin Source File
+
SOURCE=.\RMCast_Fragment.i
# End Source File
# Begin Source File
+SOURCE=.\RMCast_IO_UDP.i
+# End Source File
+# Begin Source File
+
+SOURCE=.\RMCast_Module.i
+# End Source File
+# Begin Source File
+
+SOURCE=.\RMCast_Module_Factory.i
+# End Source File
+# Begin Source File
+
SOURCE=.\RMCast_Partial_Message.i
# End Source File
# Begin Source File
SOURCE=.\RMCast_Reassembly.i
# End Source File
-# End Group
-# Begin Group "Template Files"
-
-# PROP Default_Filter ""
# Begin Source File
-SOURCE=.\RMCast_Fragment.cpp
-# PROP Exclude_From_Build 1
+SOURCE=.\RMCast_UDP_Event_Handler.i
# End Source File
# Begin Source File
-SOURCE=.\RMCast_Reassembly.cpp
-# PROP Exclude_From_Build 1
+SOURCE=.\RMCast_UDP_Proxy.i
# End Source File
# End Group
+# Begin Group "Template Files"
+
+# PROP Default_Filter ""
+# End Group
# End Target
# End Project
diff --git a/protocols/ace/RMCast/RMCast.h b/protocols/ace/RMCast/RMCast.h
index 025f82a1bfb..654f391204b 100644
--- a/protocols/ace/RMCast/RMCast.h
+++ b/protocols/ace/RMCast/RMCast.h
@@ -62,8 +62,8 @@ public:
// +---------+----------------------+
// | 32 bits | fragment_offset |
// +---------+----------------------+
- // | 32 bits | payload_size |
- // +---------+----------------------+
+ // ? ? ? ? ? | 32 bits | payload_size |
+ // ? ? ? ? ? +---------+----------------------+
// | | payload |
// +---------+----------------------+
//
@@ -84,9 +84,9 @@ public:
// +---------+----------------------+
// | 8 bits | MT_ACK |
// +---------+----------------------+
- // | 32 bits | last_successful |
+ // | 32 bits | highest_in_sequence |
// +---------+----------------------+
- // | 32 bits | last_received |
+ // | 32 bits | highest_received |
// +---------+----------------------+
//
@@ -196,26 +196,30 @@ public:
ACE_Message_Block *payload;
};
- struct Ack
+ struct Poll
{
- ACE_UINT32 expected;
- ACE_UINT32 last_received;
};
- struct Join
+ struct Ack_Join
{
+ ACE_INT32 next_sequence_number;
};
- struct Leave
+ struct Ack_Leave
{
};
- struct Ack_Join
+ struct Ack
{
- ACE_INT32 next_sequence_number;
+ ACE_UINT32 highest_in_sequence;
+ ACE_UINT32 highest_received;
};
- struct Ack_Leave
+ struct Join
+ {
+ };
+
+ struct Leave
{
};
};
diff --git a/protocols/ace/RMCast/RMCast_Fragment.cpp b/protocols/ace/RMCast/RMCast_Fragment.cpp
index b3baee4f972..976def7a241 100644
--- a/protocols/ace/RMCast/RMCast_Fragment.cpp
+++ b/protocols/ace/RMCast/RMCast_Fragment.cpp
@@ -25,7 +25,7 @@ ACE_RMCast_Fragment::~ACE_RMCast_Fragment (void)
}
int
-ACE_RMCast_Fragment::put_data (ACE_RMCast::Data &received_data)
+ACE_RMCast_Fragment::data (ACE_RMCast::Data &received_data)
{
if (this->next () == 0)
return 0;
@@ -38,7 +38,7 @@ ACE_RMCast_Fragment::put_data (ACE_RMCast::Data &received_data)
// @@ We should keep the total size precomputed
data.total_size = mb->total_size ();
- // We must leave room for the header
+ // We must leave room for the header
#if defined (ACE_HAS_BROKEN_DGRAM_SENDV)
const int ACE_RMCAST_WRITEV_MAX = IOV_MAX - 2;
#else
@@ -126,7 +126,7 @@ ACE_RMCast_Fragment::put_data (ACE_RMCast::Data &received_data)
+ last_sent_mb_len);
data.payload = blocks;
- if (this->next ()->put_data (data) == -1)
+ if (this->next ()->data (data) == -1)
return -1;
// adjust the offset
@@ -172,7 +172,7 @@ ACE_RMCast_Fragment::put_data (ACE_RMCast::Data &received_data)
// have:
if (iovcnt == ACE_RMCAST_WRITEV_MAX)
{
- if (this->next ()->put_data (data) == -1)
+ if (this->next ()->data (data) == -1)
return -1;
iovcnt = 0;
@@ -184,5 +184,5 @@ ACE_RMCast_Fragment::put_data (ACE_RMCast::Data &received_data)
if (iovcnt == 0)
return 0;
- return this->next ()->put_data (data);
+ return this->next ()->data (data);
}
diff --git a/protocols/ace/RMCast/RMCast_Fragment.h b/protocols/ace/RMCast/RMCast_Fragment.h
index e42440b6c12..7b64d763ebc 100644
--- a/protocols/ace/RMCast/RMCast_Fragment.h
+++ b/protocols/ace/RMCast/RMCast_Fragment.h
@@ -40,7 +40,7 @@ public:
// feedback from the lower layer (transport?)
// = The ACE_RMCast_Module methods
- virtual int put_data (ACE_RMCast::Data &data);
+ virtual int data (ACE_RMCast::Data &data);
private:
size_t max_fragment_size_;
diff --git a/protocols/ace/RMCast/RMCast_IO_UDP.cpp b/protocols/ace/RMCast/RMCast_IO_UDP.cpp
new file mode 100644
index 00000000000..89cc7ae3c3a
--- /dev/null
+++ b/protocols/ace/RMCast/RMCast_IO_UDP.cpp
@@ -0,0 +1,418 @@
+//
+// $Id$
+//
+
+#include "RMCast_IO_UDP.h"
+#include "RMCast_UDP_Proxy.h"
+#include "RMCast_Module_Factory.h"
+#include "ace/Handle_Set.h"
+#include "ace/Reactor.h"
+#include "ace/Message_Block.h"
+
+#if !defined (__ACE_INLINE__)
+# include "RMCast_IO_UDP.i"
+#endif /* ! __ACE_INLINE__ */
+
+ACE_RCSID(ace, RMCast_IO_UDP, "$Id$")
+
+ACE_RMCast_IO_UDP::~ACE_RMCast_IO_UDP (void)
+{
+}
+
+int
+ACE_RMCast_IO_UDP::subscribe (const ACE_INET_Addr &mcast_addr,
+ int reuse_addr,
+ const ACE_TCHAR *net_if,
+ int protocol_family,
+ int protocol)
+{
+ this->mcast_group_ = mcast_addr;
+ return this->dgram_.subscribe (mcast_addr,
+ reuse_addr,
+ net_if,
+ protocol_family,
+ protocol);
+}
+
+int
+ACE_RMCast_IO_UDP::handle_events (ACE_Time_Value *tv)
+{
+ ACE_HANDLE h = this->dgram_.get_handle ();
+ if (h == ACE_INVALID_HANDLE)
+ return -1;
+
+ ACE_Handle_Set handle_set;
+ handle_set.set_bit (h);
+
+ ACE_Countdown_Time countdown (tv);
+
+ int r = ACE_OS::select (int(h) + 1,
+ handle_set, 0, 0,
+ tv);
+ if (r == -1)
+ {
+ if (errno == EINTR)
+ return 0;
+ else
+ return -1;
+ }
+ else if (r == 0)
+ {
+ return 0;
+ }
+
+ return this->handle_input (h);
+}
+
+int
+ACE_RMCast_IO_UDP::register_handlers (ACE_Reactor *reactor)
+{
+ this->eh_.reactor (reactor);
+ return reactor->register_handler (&this->eh_,
+ ACE_Event_Handler::READ_MASK);
+}
+
+int
+ACE_RMCast_IO_UDP::remove_handlers (void)
+{
+ ACE_Reactor *r = this->eh_.reactor ();
+ if (r != 0)
+ {
+ r->remove_handler (&this->eh_,
+ ACE_Event_Handler::ALL_EVENTS_MASK
+ | ACE_Event_Handler::DONT_CALL);
+ this->eh_.reactor (0);
+ }
+ return 0;
+}
+
+int
+ACE_RMCast_IO_UDP::handle_input (ACE_HANDLE)
+{
+ // @@ We should use a system constant instead of this literal
+ const int max_udp_packet_size = 65536;
+ char buffer[max_udp_packet_size];
+
+ ACE_INET_Addr from_address;
+ ssize_t r =
+ this->dgram_.recv (buffer, sizeof(buffer), from_address);
+
+ if (r == -1)
+ {
+ // @@ LOG??
+ ACE_DEBUG ((LM_DEBUG,
+ "RMCast_IO_UDP::handle_input () - "
+ "error in recv\n"));
+ return -1;
+ }
+
+ // ACE_HEX_DUMP ((LM_DEBUG, buffer, 16, "Receiver::handle_input"));
+
+ // @@ Locking!
+
+ int type = buffer[0];
+
+ if (type < 0 || type >= ACE_RMCast::MT_LAST)
+ {
+ // @@ Log: invalid message type!!
+ // @@ TODO: should we return -1? The socket is still valid, it
+ // makes little sense to destroy it just because one remote
+ // sender is sending invalid messages. Maybe we should
+ // strategize this too, and report the problem to the
+ // application, this could indicate a misconfiguration or
+ // something worse...
+
+ // In any case the proxy should be destroyed, its peer is making
+ // something really wrong.
+ ACE_RMCast_UDP_Proxy *proxy;
+ if (this->map_.unbind (from_address, proxy) == 0)
+ {
+ this->factory_->destroy (proxy->module ());
+ delete proxy;
+ }
+ return 0;
+ }
+
+ ACE_RMCast_UDP_Proxy *proxy;
+ if (this->map_.find (from_address, proxy) != 0)
+ {
+ // State == RS_NON_EXISTENT
+
+ // @@ We should validate the message *before* creating the
+ // object, all we need is some sort of validation strategy, a
+ // different one for the receiver and another one for the
+ // sender.
+#if 0
+ if (type == ACE_RMCast::MT_ACK
+ || type == ACE_RMCast::MT_JOIN
+ || type == ACE_RMCast::MT_LEAVE
+ || type == ACE_RMCast::MT_ACK_LEAVE)
+ {
+ // All these message types indicate a problem, the should be
+ // generated by receivers, not received by them.
+ return 0;
+ }
+#endif /* 0 */
+
+ // The message type is valid, we must create a new proxy,
+ // initially in the JOINING state...
+ ACE_RMCast_Module *module = this->factory_->create (this);
+ if (module == 0)
+ {
+ // @@ LOG??
+ // Try to continue working, maybe the module can be created
+ // later.
+ return 0;
+ }
+ ACE_NEW_RETURN (proxy,
+ ACE_RMCast_UDP_Proxy(this,
+ from_address,
+ module),
+ 0);
+
+ if (this->map_.bind (from_address, proxy) != 0)
+ {
+ // @@ LOG??
+ return 0;
+ }
+
+ }
+
+ // Have the proxy process the message and do the right thing.
+ return proxy->receive_message (buffer, r);
+}
+
+ACE_HANDLE
+ACE_RMCast_IO_UDP::get_handle (void) const
+{
+ return this->dgram_.get_handle ();
+}
+
+int
+ACE_RMCast_IO_UDP::data (ACE_RMCast::Data &data)
+{
+ return this->send_data (data, this->mcast_group_);
+}
+
+int
+ACE_RMCast_IO_UDP::poll (ACE_RMCast::Poll &poll)
+{
+ return this->send_poll (poll, this->mcast_group_);
+}
+
+int
+ACE_RMCast_IO_UDP::ack_join (ACE_RMCast::Ack_Join &ack_join)
+{
+ return this->send_ack_join (ack_join, this->mcast_group_);
+}
+
+int
+ACE_RMCast_IO_UDP::ack_leave (ACE_RMCast::Ack_Leave &ack_leave)
+{
+ return this->send_ack_leave (ack_leave, this->mcast_group_);
+}
+
+int
+ACE_RMCast_IO_UDP::ack (ACE_RMCast::Ack &ack)
+{
+ return this->send_ack (ack, this->mcast_group_);
+}
+
+int
+ACE_RMCast_IO_UDP::join (ACE_RMCast::Join &join)
+{
+ return this->send_join (join, this->mcast_group_);
+}
+
+int
+ACE_RMCast_IO_UDP::leave (ACE_RMCast::Leave &leave)
+{
+ return this->send_leave (leave, this->mcast_group_);
+}
+
+int
+ACE_RMCast_IO_UDP::send_data (ACE_RMCast::Data &data,
+ const ACE_INET_Addr &to)
+{
+ // The first message block contains the header
+ // @@ TODO: We could keep the header pre-initialized, and only
+ // update the portions that do change...
+ ACE_UINT32 tmp;
+ char header[1 + 3 * sizeof(ACE_UINT32)];
+ header[0] = ACE_RMCast::MT_DATA;
+
+ tmp = ACE_HTONL (data.sequence_number);
+ ACE_OS::memcpy (header + 1,
+ &tmp, sizeof(ACE_UINT32));
+ tmp = ACE_HTONL (data.total_size);
+ ACE_OS::memcpy (header + 1 + sizeof(ACE_UINT32),
+ &tmp, sizeof(ACE_UINT32));
+ tmp = ACE_HTONL (data.fragment_offset);
+ ACE_OS::memcpy (header + 1 + 2 * sizeof(ACE_UINT32),
+ &tmp, sizeof(ACE_UINT32));
+
+ iovec iov[IOV_MAX];
+ int iovcnt = 1;
+
+ iov[0].iov_base = header;
+ iov[0].iov_len = sizeof(header);
+
+ ACE_Message_Block *mb = data.payload;
+
+ for (const ACE_Message_Block *i = mb; i != 0; i = i->cont ())
+ {
+ iov[iovcnt].iov_base = i->rd_ptr ();
+ iov[iovcnt].iov_len = i->length ();
+ iovcnt++;
+ if (iovcnt >= IOV_MAX)
+ return -1;
+ }
+
+ // @@ This pacing stuff here reduced the number of packet lost in
+ // loopback tests, but it should be taken out for real applications
+ // (or at least made configurable!)
+ ACE_Time_Value tv (0, 10000);
+ ACE_OS::sleep (tv);
+
+ // ACE_SOCK_MCast_Dgram disallows sending, but it actually works.
+ ACE_SOCK_Dgram &dgram = this->dgram_;
+
+ if (dgram.send (iov, iovcnt, to) == -1)
+ return -1;
+
+#if 0
+ ACE_HEX_DUMP ((LM_DEBUG,
+ (char*)iov[0].iov_base, iov[0].iov_len, "Sending"));
+#endif
+
+ return 0;
+}
+
+int
+ACE_RMCast_IO_UDP::send_poll (ACE_RMCast::Poll &poll,
+ const ACE_INET_Addr &to)
+{
+ // @@ TODO: We could keep the header pre-initialized, and only
+ // update the portions that do change...
+ char header[16];
+ header[0] = ACE_RMCast::MT_POLL;
+
+ // ACE_SOCK_MCast_Dgram disallows sending, but it actually works.
+ ACE_SOCK_Dgram &dgram = this->dgram_;
+
+ if (dgram.send (header, 1, to) == -1)
+ return -1;
+
+ return 0;
+}
+
+int
+ACE_RMCast_IO_UDP::send_ack_join (ACE_RMCast::Ack_Join &ack_join,
+ const ACE_INET_Addr &to)
+{
+ // @@ TODO: We could keep the header pre-initialized, and only
+ // update the portions that do change...
+ char header[16];
+ header[0] = ACE_RMCast::MT_ACK_JOIN;
+
+ ACE_UINT32 tmp = ACE_HTONL (ack_join.next_sequence_number);
+ ACE_OS::memcpy (header + 1,
+ &tmp, sizeof(ACE_UINT32));
+ // ACE_SOCK_MCast_Dgram disallows sending, but it actually works.
+ ACE_SOCK_Dgram &dgram = this->dgram_;
+
+ if (dgram.send (header, 1 + sizeof(ACE_UINT32), to) == -1)
+ return -1;
+
+ return 0;
+}
+
+int
+ACE_RMCast_IO_UDP::send_ack_leave (ACE_RMCast::Ack_Leave &ack_leave,
+ const ACE_INET_Addr &to)
+{
+ // @@ TODO: We could keep the header pre-initialized, and only
+ // update the portions that do change...
+ char header[16];
+ header[0] = ACE_RMCast::MT_ACK_LEAVE;
+
+ // ACE_SOCK_MCast_Dgram disallows sending, but it actually works.
+ ACE_SOCK_Dgram &dgram = this->dgram_;
+
+ if (dgram.send (header, 1, to) == -1)
+ return -1;
+
+ return 0;
+}
+
+int
+ACE_RMCast_IO_UDP::send_ack (ACE_RMCast::Ack &ack,
+ const ACE_INET_Addr &to)
+{
+ // @@ TODO: We could keep the header pre-initialized, and only
+ // update the portions that do change...
+ char header[16];
+ header[0] = ACE_RMCast::MT_ACK;
+
+ ACE_UINT32 tmp = ACE_HTONL (ack.highest_in_sequence);
+ ACE_OS::memcpy (header + 1,
+ &tmp, sizeof(ACE_UINT32));
+ tmp = ACE_HTONL (ack.highest_received);
+ ACE_OS::memcpy (header + 1 + sizeof(ACE_UINT32),
+ &tmp, sizeof(ACE_UINT32));
+
+ // ACE_SOCK_MCast_Dgram disallows sending, but it actually works.
+ ACE_SOCK_Dgram &dgram = this->dgram_;
+
+ if (dgram.send (header, 1 + 2*sizeof(ACE_UINT32), to) == -1)
+ return -1;
+
+ return 0;
+}
+
+int
+ACE_RMCast_IO_UDP::send_join (ACE_RMCast::Join &join,
+ const ACE_INET_Addr &to)
+{
+ // @@ TODO: We could keep the header pre-initialized, and only
+ // update the portions that do change...
+ char header[16];
+ header[0] = ACE_RMCast::MT_JOIN;
+
+ // ACE_SOCK_MCast_Dgram disallows sending, but it actually works.
+ ACE_SOCK_Dgram &dgram = this->dgram_;
+
+ if (dgram.send (header, 1, to) == -1)
+ return -1;
+
+ return 0;
+}
+
+int
+ACE_RMCast_IO_UDP::send_leave (ACE_RMCast::Leave &,
+ const ACE_INET_Addr &to)
+{
+ // @@ TODO: We could keep the header pre-initialized, and only
+ // update the portions that do change...
+ char header[16];
+ header[0] = ACE_RMCast::MT_LEAVE;
+
+ // ACE_SOCK_MCast_Dgram disallows sending, but it actually works.
+ ACE_SOCK_Dgram &dgram = this->dgram_;
+
+ if (dgram.send (header, 1, to) == -1)
+ return -1;
+
+ return 0;
+}
+
+
+#if defined (ACE_HAS_EXPLICIT_TEMPLATE_INSTANTIATION)
+
+template class ACE_Hash_Map_Manager<ACE_INET_Addr,ACE_RMCast_UDP_Proxy*,ACE_Null_Mutex>;
+template class ACE_Hash_Map_Manager_Ex<ACE_INET_Addr,ACE_RMCast_UDP_Proxy*,ACE_Hash<ACE_INET_Addr>,ACE_Equal_To<ACE_INET_Addr>,ACE_Null_Mutex>;
+template class ACE_Hash_Map_Iterator<ACE_INET_Addr,ACE_RMCast_UDP_Proxy*,ACE_Null_Mutex>;
+template class ACE_Hash_Map_Iterator_Base_Ex<ACE_INET_Addr,ACE_RMCast_UDP_Proxy*,ACE_Hash<ACE_INET_Addr>,ACE_Equal_To<ACE_INET_Addr>,ACE_Null_Mutex>;
+template class ACE_Hash_Map_Entry<ACE_INET_Addr,ACE_RMCast_UDP_Proxy*>;
+
+#endif /* ACE_HAS_EXPLICIT_TEMPLATE_INSTANTIATION */
diff --git a/ace/RMCast/RMCast_UDP_Receiver.h b/protocols/ace/RMCast/RMCast_IO_UDP.h
index bfc56d89705..bdcccabe6e1 100644
--- a/ace/RMCast/RMCast_UDP_Receiver.h
+++ b/protocols/ace/RMCast/RMCast_IO_UDP.h
@@ -10,35 +10,36 @@
//
// ============================================================================
-#ifndef ACE_RMCAST_UDP_RECEIVER_H
-#define ACE_RMCAST_UDP_RECEIVER_H
+#ifndef ACE_RMCAST_IO_UDP_H
+#define ACE_RMCAST_IO_UDP_H
#include "ace/pre.h"
+#include "RMCast_Module.h"
#include "RMCast_UDP_Event_Handler.h"
#include "ace/SOCK_Dgram_Mcast.h"
#include "ace/Hash_Map_Manager.h"
#include "ace/Synch.h"
+#include "ace/INET_Addr.h"
#if !defined (ACE_LACKS_PRAGMA_ONCE)
# pragma once
#endif /* ACE_LACKS_PRAGMA_ONCE */
-class ACE_RMCast_Sender_Proxy;
-class ACE_RMCast_Sender_Proxy_Factory;
+class ACE_RMCast_UDP_Proxy;
+class ACE_RMCast_Module_Factory;
class ACE_Reactor;
class ACE_Time_Value;
-class ACE_INET_Addr;
-class ACE_RMCast_Export ACE_RMCast_UDP_Receiver
+class ACE_RMCast_Export ACE_RMCast_IO_UDP : public ACE_RMCast_Module
{
public:
- ACE_RMCast_UDP_Receiver (ACE_RMCast_Sender_Proxy_Factory *factory);
+ ACE_RMCast_IO_UDP (ACE_RMCast_Module_Factory *factory);
// Constructor
- // <factory> is used to create the Sender_Proxy and Modules that
- // process incoming messages.
- // The caller owns <factory>.
+ // <factory> is used to create the modules for each proxy that
+ // process incoming messages. The class does *not* assume ownership
+ // of <factory>, the caller owns it.
- ~ACE_RMCast_UDP_Receiver (void);
+ ~ACE_RMCast_IO_UDP (void);
// Destructor
int subscribe (const ACE_INET_Addr &mcast_addr,
@@ -69,25 +70,36 @@ public:
ACE_HANDLE get_handle (void) const;
// Obtain the handle for the underlying socket
-private:
- int send_join (ACE_INET_Addr &from);
- // Send a JOIN message
-
- int send_ack (ACE_RMCast_Sender_Proxy *sender_proxy,
- ACE_INET_Addr &from);
- // Send an ACK message
-
- int send_leave (ACE_INET_Addr &from);
- // Send a LEAVE messsage
+ // Send back to the remove object represented by <proxy>
+ int send_data (ACE_RMCast::Data &, const ACE_INET_Addr &);
+ int send_poll (ACE_RMCast::Poll &, const ACE_INET_Addr &);
+ int send_ack_join (ACE_RMCast::Ack_Join &, const ACE_INET_Addr &);
+ int send_ack_leave (ACE_RMCast::Ack_Leave &, const ACE_INET_Addr &);
+ int send_ack (ACE_RMCast::Ack &, const ACE_INET_Addr &);
+ int send_join (ACE_RMCast::Join &, const ACE_INET_Addr &);
+ int send_leave (ACE_RMCast::Leave &, const ACE_INET_Addr &);
+
+ // = The RMCast_Module methods
+ virtual int data (ACE_RMCast::Data &);
+ virtual int poll (ACE_RMCast::Poll &);
+ virtual int ack_join (ACE_RMCast::Ack_Join &);
+ virtual int ack_leave (ACE_RMCast::Ack_Leave &);
+ virtual int ack (ACE_RMCast::Ack &);
+ virtual int join (ACE_RMCast::Join &);
+ virtual int leave (ACE_RMCast::Leave &);
+ // The messages are sent to the multicast group
private:
- ACE_RMCast_Sender_Proxy_Factory *factory_;
- // The factory used to create Sender proxies
+ ACE_RMCast_Module_Factory *factory_;
+ // The factory used to create the modules attached to each proxy
+
+ ACE_INET_Addr mcast_group_;
+ // The multicast group we subscribe and send to
ACE_SOCK_Dgram_Mcast dgram_;
// The socket
- typedef ACE_Hash_Map_Manager<ACE_INET_Addr,ACE_RMCast_Sender_Proxy*,ACE_Null_Mutex> Map;
+ typedef ACE_Hash_Map_Manager<ACE_INET_Addr,ACE_RMCast_UDP_Proxy*,ACE_Null_Mutex> Map;
Map map_;
ACE_RMCast_UDP_Event_Handler eh_;
@@ -95,8 +107,8 @@ private:
};
#if defined (__ACE_INLINE__)
-#include "RMCast_UDP_Receiver.i"
+#include "RMCast_IO_UDP.i"
#endif /* __ACE_INLINE__ */
#include "ace/post.h"
-#endif /* ACE_RMCAST_UDP_RECEIVER_H */
+#endif /* ACE_RMCAST_IO_UDP_H */
diff --git a/protocols/ace/RMCast/RMCast_IO_UDP.i b/protocols/ace/RMCast/RMCast_IO_UDP.i
new file mode 100644
index 00000000000..ddacc5694ad
--- /dev/null
+++ b/protocols/ace/RMCast/RMCast_IO_UDP.i
@@ -0,0 +1,9 @@
+// $Id$
+
+ACE_INLINE
+ACE_RMCast_IO_UDP::
+ ACE_RMCast_IO_UDP (ACE_RMCast_Module_Factory *factory)
+ : factory_ (factory)
+ , eh_ (this)
+{
+}
diff --git a/protocols/ace/RMCast/RMCast_Module.cpp b/protocols/ace/RMCast/RMCast_Module.cpp
index b47694abe4d..632d905f900 100644
--- a/protocols/ace/RMCast/RMCast_Module.cpp
+++ b/protocols/ace/RMCast/RMCast_Module.cpp
@@ -55,3 +55,59 @@ ACE_RMCast_Module::close (void)
{
return 0;
}
+
+int
+ACE_RMCast_Module::data (ACE_RMCast::Data &data)
+{
+ if (this->next () != 0)
+ return this->next ()->data (data);
+ return 0;
+}
+
+int
+ACE_RMCast_Module::poll (ACE_RMCast::Poll &poll)
+{
+ if (this->next () != 0)
+ return this->next ()->poll (poll);
+ return 0;
+}
+
+int
+ACE_RMCast_Module::ack_join (ACE_RMCast::Ack_Join &ack_join)
+{
+ if (this->next () != 0)
+ return this->next ()->ack_join (ack_join);
+ return 0;
+}
+
+int
+ACE_RMCast_Module::ack_leave (ACE_RMCast::Ack_Leave &ack_leave)
+{
+ if (this->next () != 0)
+ return this->next ()->ack_leave (ack_leave);
+ return 0;
+}
+
+int
+ACE_RMCast_Module::ack (ACE_RMCast::Ack &ack)
+{
+ if (this->next () != 0)
+ return this->next ()->ack (ack);
+ return 0;
+}
+
+int
+ACE_RMCast_Module::join (ACE_RMCast::Join &join)
+{
+ if (this->next () != 0)
+ return this->next ()->join (join);
+ return 0;
+}
+
+int
+ACE_RMCast_Module::leave (ACE_RMCast::Leave &leave)
+{
+ if (this->next () != 0)
+ return this->next ()->leave (leave);
+ return 0;
+}
diff --git a/protocols/ace/RMCast/RMCast_Module.h b/protocols/ace/RMCast/RMCast_Module.h
index 30f3da2f4fe..9f83c2d5be4 100644
--- a/protocols/ace/RMCast/RMCast_Module.h
+++ b/protocols/ace/RMCast/RMCast_Module.h
@@ -59,7 +59,13 @@ public:
virtual int close (void);
// Close the module.
- virtual int put_data (ACE_RMCast::Data &data) = 0;
+ virtual int data (ACE_RMCast::Data &);
+ virtual int poll (ACE_RMCast::Poll &);
+ virtual int ack_join (ACE_RMCast::Ack_Join &);
+ virtual int ack_leave (ACE_RMCast::Ack_Leave &);
+ virtual int ack (ACE_RMCast::Ack &);
+ virtual int join (ACE_RMCast::Join &);
+ virtual int leave (ACE_RMCast::Leave &);
// Push data down the stack
private:
diff --git a/protocols/ace/RMCast/RMCast_Module_Factory.cpp b/protocols/ace/RMCast/RMCast_Module_Factory.cpp
new file mode 100644
index 00000000000..b749048a78c
--- /dev/null
+++ b/protocols/ace/RMCast/RMCast_Module_Factory.cpp
@@ -0,0 +1,13 @@
+// $Id$
+
+#include "RMCast_Module_Factory.h"
+
+#if !defined (__ACE_INLINE__)
+# include "RMCast_Module_Factory.i"
+#endif /* ! __ACE_INLINE__ */
+
+ACE_RCSID(ace, RMCast_Module_Factory, "$Id$")
+
+ACE_RMCast_Module_Factory::~ACE_RMCast_Module_Factory (void)
+{
+}
diff --git a/protocols/ace/RMCast/RMCast_Module_Factory.h b/protocols/ace/RMCast/RMCast_Module_Factory.h
new file mode 100644
index 00000000000..722ad87d678
--- /dev/null
+++ b/protocols/ace/RMCast/RMCast_Module_Factory.h
@@ -0,0 +1,50 @@
+/* -*- C++ -*- */
+// $Id$
+
+// ============================================================================
+//
+// = LIBRARY
+// ace
+//
+// = FILENAME
+// RMCast_Module_Factory.h
+//
+// = AUTHOR
+// Carlos O'Ryan <coryan@uci.edu>
+//
+// ============================================================================
+
+#ifndef ACE_RMCAST_MODULE_FACTORY_H
+#define ACE_RMCAST_MODULE_FACTORY_H
+#include "ace/pre.h"
+
+#include "RMCast.h"
+
+#if !defined (ACE_LACKS_PRAGMA_ONCE)
+# pragma once
+#endif /* ACE_LACKS_PRAGMA_ONCE */
+
+class ACE_RMCast_Module;
+class ACE_RMCast_IO_UDP;
+
+class ACE_RMCast_Export ACE_RMCast_Module_Factory
+{
+ // = DESCRIPTION
+ //
+public:
+ virtual ~ACE_RMCast_Module_Factory (void);
+ // Destructor
+
+ virtual ACE_RMCast_Module *create (ACE_RMCast_IO_UDP *) = 0;
+ // Create a new proxy
+
+ virtual void destroy (ACE_RMCast_Module *) = 0;
+ // Destroy a proxy
+};
+
+#if defined (__ACE_INLINE__)
+#include "RMCast_Module_Factory.i"
+#endif /* __ACE_INLINE__ */
+
+#include "ace/post.h"
+#endif /* ACE_RMCAST_MODULE_FACTORY_H */
diff --git a/ace/RMCast/RMCast_Sender_Proxy_Factory.i b/protocols/ace/RMCast/RMCast_Module_Factory.i
index cfa1da318d3..cfa1da318d3 100644
--- a/ace/RMCast/RMCast_Sender_Proxy_Factory.i
+++ b/protocols/ace/RMCast/RMCast_Module_Factory.i
diff --git a/protocols/ace/RMCast/RMCast_Reassembly.cpp b/protocols/ace/RMCast/RMCast_Reassembly.cpp
index a52791e1ebf..ba2e9b79c1a 100644
--- a/protocols/ace/RMCast/RMCast_Reassembly.cpp
+++ b/protocols/ace/RMCast/RMCast_Reassembly.cpp
@@ -34,7 +34,7 @@ ACE_RMCast_Reassembly::~ACE_RMCast_Reassembly (void)
}
int
-ACE_RMCast_Reassembly::put_data (ACE_RMCast::Data &data)
+ACE_RMCast_Reassembly::data (ACE_RMCast::Data &data)
{
if (this->next () == 0)
return 0;
@@ -42,7 +42,7 @@ ACE_RMCast_Reassembly::put_data (ACE_RMCast::Data &data)
if (data.payload->length () + data.fragment_offset > data.total_size)
{
ACE_DEBUG ((LM_DEBUG,
- "RMCast_Reassembly::put_data - invalid size\n"));
+ "RMCast_Reassembly::data - invalid size\n"));
return -1; // Corrupt message?
}
@@ -92,7 +92,7 @@ ACE_RMCast_Reassembly::put_data (ACE_RMCast::Data &data)
downstream_data.fragment_offset = 0;
downstream_data.payload = message->message_body ();
- int r = this->next ()->put_data (downstream_data);
+ int r = this->next ()->data (downstream_data);
delete message;
diff --git a/protocols/ace/RMCast/RMCast_Reassembly.h b/protocols/ace/RMCast/RMCast_Reassembly.h
index 0982d059c7c..0bf0c3a61ee 100644
--- a/protocols/ace/RMCast/RMCast_Reassembly.h
+++ b/protocols/ace/RMCast/RMCast_Reassembly.h
@@ -34,7 +34,7 @@ public:
// Destructor
// = The ACE_RMCast_Module methods
- virtual int put_data (ACE_RMCast::Data &data);
+ virtual int data (ACE_RMCast::Data &data);
private:
ACE_SYNCH_MUTEX mutex_;
diff --git a/protocols/ace/RMCast/RMCast_Sender_Proxy.cpp b/protocols/ace/RMCast/RMCast_Sender_Proxy.cpp
deleted file mode 100644
index ff1b7b33f15..00000000000
--- a/protocols/ace/RMCast/RMCast_Sender_Proxy.cpp
+++ /dev/null
@@ -1,20 +0,0 @@
-// $Id$
-
-#include "RMCast_Sender_Proxy.h"
-#include "RMCast_Module.h"
-#include "ace/Message_Block.h"
-
-#if !defined (__ACE_INLINE__)
-# include "RMCast_Sender_Proxy.i"
-#endif /* ! __ACE_INLINE__ */
-
-ACE_RCSID(ace, RMCast_Sender_Proxy, "$Id$")
-
-ACE_RMCast_Sender_Proxy::ACE_RMCast_Sender_Proxy (ACE_RMCast_Module *module)
- : module_ (module)
-{
-}
-
-ACE_RMCast_Sender_Proxy::~ACE_RMCast_Sender_Proxy (void)
-{
-}
diff --git a/protocols/ace/RMCast/RMCast_Sender_Proxy.i b/protocols/ace/RMCast/RMCast_Sender_Proxy.i
deleted file mode 100644
index b47573711ea..00000000000
--- a/protocols/ace/RMCast/RMCast_Sender_Proxy.i
+++ /dev/null
@@ -1,7 +0,0 @@
-// $Id$
-
-ACE_INLINE ACE_RMCast_Module *
-ACE_RMCast_Sender_Proxy::module (void) const
-{
- return this->module_;
-}
diff --git a/protocols/ace/RMCast/RMCast_Sender_Proxy_Best_Effort.cpp b/protocols/ace/RMCast/RMCast_Sender_Proxy_Best_Effort.cpp
deleted file mode 100644
index ba525f245bc..00000000000
--- a/protocols/ace/RMCast/RMCast_Sender_Proxy_Best_Effort.cpp
+++ /dev/null
@@ -1,72 +0,0 @@
-// $Id$
-
-#include "RMCast_Sender_Proxy_Best_Effort.h"
-#include "RMCast_Module.h"
-#include "ace/Message_Block.h"
-
-#if !defined (__ACE_INLINE__)
-# include "RMCast_Sender_Proxy_Best_Effort.i"
-#endif /* ! __ACE_INLINE__ */
-
-ACE_RCSID(ace, RMCast_Sender_Proxy_Best_Effort, "$Id$")
-
-ACE_RMCast_Sender_Proxy_Best_Effort::
- ACE_RMCast_Sender_Proxy_Best_Effort (ACE_RMCast_Module *module)
- : ACE_RMCast_Sender_Proxy (module)
-{
-}
-
-ACE_RMCast_Sender_Proxy_Best_Effort::
- ~ACE_RMCast_Sender_Proxy_Best_Effort (void)
-{
-}
-
-int
-ACE_RMCast_Sender_Proxy_Best_Effort::receive_message (char *buffer,
- size_t size)
-{
- int type = buffer[0];
-
- // All control messages are ignored...
- if (type != ACE_RMCast::MT_DATA)
- return 0;
-
- // @@ Push the event through the stack
-#if 0
- ACE_DEBUG ((LM_DEBUG,
- "Proxy(%x) - received data\n", long(this)));
- ACE_HEX_DUMP ((LM_DEBUG, buffer, header, "Proxy"));
-#endif
-
- const size_t header_size = 1 + 3 * sizeof(ACE_UINT32);
- if (size < header_size)
- {
- // The message is too small
- return 0;
- }
-
- ACE_UINT32 tmp;
-
- ACE_RMCast::Data data;
-
- ACE_OS::memcpy (&tmp, buffer + 1,
- sizeof(tmp));
- data.sequence_number = ACE_NTOHL (tmp);
-
- ACE_OS::memcpy (&tmp, buffer + 1 + sizeof(tmp),
- sizeof(tmp));
- data.total_size = ACE_NTOHL (tmp);
-
- ACE_OS::memcpy (&tmp, buffer + 1 + 2 * sizeof(tmp),
- sizeof(tmp));
- data.fragment_offset = ACE_NTOHL (tmp);
-
- // Pass it up the module...
- ACE_Message_Block *mb;
- ACE_NEW_RETURN (mb, ACE_Message_Block, -1);
- mb->size (size - header_size);
- mb->copy (buffer + header_size, size - header_size);
-
- data.payload = mb;
- return this->module ()->put_data (data);
-}
diff --git a/protocols/ace/RMCast/RMCast_Sender_Proxy_Best_Effort.h b/protocols/ace/RMCast/RMCast_Sender_Proxy_Best_Effort.h
deleted file mode 100644
index 304e026afc3..00000000000
--- a/protocols/ace/RMCast/RMCast_Sender_Proxy_Best_Effort.h
+++ /dev/null
@@ -1,53 +0,0 @@
-/* -*- C++ -*- */
-// $Id$
-
-// ============================================================================
-//
-// = LIBRARY
-// ace
-//
-// = FILENAME
-// RMCast_Sender_Proxy_Best_Effort.h
-//
-// = AUTHOR
-// Carlos O'Ryan <coryan@uci.edu>
-//
-// ============================================================================
-
-#ifndef ACE_RMCAST_SENDER_PROXY_BEST_EFFORT_H
-#define ACE_RMCAST_SENDER_PROXY_BEST_EFFORT_H
-#include "ace/pre.h"
-
-#include "RMCast_Sender_Proxy.h"
-
-#if !defined (ACE_LACKS_PRAGMA_ONCE)
-# pragma once
-#endif /* ACE_LACKS_PRAGMA_ONCE */
-
-class ACE_RMCast_Module;
-
-class ACE_RMCast_Export ACE_RMCast_Sender_Proxy_Best_Effort : public ACE_RMCast_Sender_Proxy
-{
- // = TITLE
- // Reliable Multicast Sender Ambassador
- //
- // = DESCRIPTION
- // Implement an Ambassador for the reliable multicast senders.
- //
-public:
- ACE_RMCast_Sender_Proxy_Best_Effort (ACE_RMCast_Module *module);
- // Constructor
-
- ~ACE_RMCast_Sender_Proxy_Best_Effort (void);
- // Destructor
-
- virtual int receive_message (char *buffer, size_t size);
- // A DATA message was received.
-};
-
-#if defined (__ACE_INLINE__)
-#include "RMCast_Sender_Proxy_Best_Effort.i"
-#endif /* __ACE_INLINE__ */
-
-#include "ace/post.h"
-#endif /* ACE_RMCAST_SENDER_PROXY_BEST_EFFORT_H */
diff --git a/protocols/ace/RMCast/RMCast_Sender_Proxy_Best_Effort.i b/protocols/ace/RMCast/RMCast_Sender_Proxy_Best_Effort.i
deleted file mode 100644
index cfa1da318d3..00000000000
--- a/protocols/ace/RMCast/RMCast_Sender_Proxy_Best_Effort.i
+++ /dev/null
@@ -1 +0,0 @@
-// $Id$
diff --git a/protocols/ace/RMCast/RMCast_Sender_Proxy_Factory.cpp b/protocols/ace/RMCast/RMCast_Sender_Proxy_Factory.cpp
deleted file mode 100644
index 48a82b5dfbc..00000000000
--- a/protocols/ace/RMCast/RMCast_Sender_Proxy_Factory.cpp
+++ /dev/null
@@ -1,13 +0,0 @@
-// $Id$
-
-#include "RMCast_Sender_Proxy_Factory.h"
-
-#if !defined (__ACE_INLINE__)
-# include "RMCast_Sender_Proxy_Factory.i"
-#endif /* ! __ACE_INLINE__ */
-
-ACE_RCSID(ace, RMCast_Sender_Proxy_Factory, "$Id$")
-
-ACE_RMCast_Sender_Proxy_Factory::~ACE_RMCast_Sender_Proxy_Factory (void)
-{
-}
diff --git a/protocols/ace/RMCast/RMCast_Sender_Proxy_Factory.h b/protocols/ace/RMCast/RMCast_Sender_Proxy_Factory.h
deleted file mode 100644
index 7dff4d2796f..00000000000
--- a/protocols/ace/RMCast/RMCast_Sender_Proxy_Factory.h
+++ /dev/null
@@ -1,55 +0,0 @@
-/* -*- C++ -*- */
-// $Id$
-
-// ============================================================================
-//
-// = LIBRARY
-// ace
-//
-// = FILENAME
-// RMCast_Sender_Proxy_Factory.h
-//
-// = AUTHOR
-// Carlos O'Ryan <coryan@uci.edu>
-//
-// ============================================================================
-
-#ifndef ACE_RMCAST_SENDER_PROXY_FACTORY_H
-#define ACE_RMCAST_SENDER_PROXY_FACTORY_H
-#include "ace/pre.h"
-
-#include "RMCast.h"
-
-#if !defined (ACE_LACKS_PRAGMA_ONCE)
-# pragma once
-#endif /* ACE_LACKS_PRAGMA_ONCE */
-
-class ACE_RMCast_Sender_Proxy;
-
-class ACE_RMCast_Export ACE_RMCast_Sender_Proxy_Factory
-{
- // = DESCRIPTION
- // Defines the interface to create Sender_Proxies.
- // The application provides a Sender_Proxy_Factory, this is used
- // by the receiver side to create a different proxy for each
- // remote sender. The application configures the proxy with the
- // correct modules to process incoming events and achieve the
- // desired level of reliability.
- //
-public:
- virtual ~ACE_RMCast_Sender_Proxy_Factory (void);
- // Destructor
-
- virtual ACE_RMCast_Sender_Proxy *create (void) = 0;
- // Create a new proxy
-
- virtual void destroy (ACE_RMCast_Sender_Proxy *) = 0;
- // Destroy a proxy
-};
-
-#if defined (__ACE_INLINE__)
-#include "RMCast_Sender_Proxy_Factory.i"
-#endif /* __ACE_INLINE__ */
-
-#include "ace/post.h"
-#endif /* ACE_RMCAST_SENDER_PROXY_FACTORY_H */
diff --git a/protocols/ace/RMCast/RMCast_Sender_Proxy_Factory.i b/protocols/ace/RMCast/RMCast_Sender_Proxy_Factory.i
deleted file mode 100644
index cfa1da318d3..00000000000
--- a/protocols/ace/RMCast/RMCast_Sender_Proxy_Factory.i
+++ /dev/null
@@ -1 +0,0 @@
-// $Id$
diff --git a/protocols/ace/RMCast/RMCast_UDP_Event_Handler.cpp b/protocols/ace/RMCast/RMCast_UDP_Event_Handler.cpp
index 69cfc337113..e5ff8da2761 100644
--- a/protocols/ace/RMCast/RMCast_UDP_Event_Handler.cpp
+++ b/protocols/ace/RMCast/RMCast_UDP_Event_Handler.cpp
@@ -3,7 +3,7 @@
//
#include "RMCast_UDP_Event_Handler.h"
-#include "RMCast_UDP_Receiver.h"
+#include "RMCast_IO_UDP.h"
#if !defined (__ACE_INLINE__)
# include "RMCast_UDP_Event_Handler.i"
@@ -18,19 +18,19 @@ ACE_RMCast_UDP_Event_Handler::~ACE_RMCast_UDP_Event_Handler (void)
ACE_HANDLE
ACE_RMCast_UDP_Event_Handler::get_handle (void) const
{
- return this->receiver_->get_handle ();
+ return this->io_udp_->get_handle ();
}
int
ACE_RMCast_UDP_Event_Handler::handle_input (ACE_HANDLE h)
{
- return this->receiver_->handle_input (h);
+ return this->io_udp_->handle_input (h);
}
int
ACE_RMCast_UDP_Event_Handler::handle_timeout (const ACE_Time_Value &,
const void *)
{
- // @@ return this->receiver_->handle_timeout ();
+ // @@ return this->io_udp_->handle_timeout ();
return 0;
}
diff --git a/protocols/ace/RMCast/RMCast_UDP_Event_Handler.h b/protocols/ace/RMCast/RMCast_UDP_Event_Handler.h
index 193d7038cd8..02798cee7f8 100644
--- a/protocols/ace/RMCast/RMCast_UDP_Event_Handler.h
+++ b/protocols/ace/RMCast/RMCast_UDP_Event_Handler.h
@@ -4,7 +4,7 @@
//
// = DESCRIPTION
// Implement an adapter between the ACE Reactor and the
-// ACE_RMCast_UDP_Receiver
+// ACE_RMCast_IO_UDP
//
// = AUTHOR
// Carlos O'Ryan <coryan@uci.edu>
@@ -22,13 +22,13 @@
# pragma once
#endif /* ACE_LACKS_PRAGMA_ONCE */
-class ACE_RMCast_UDP_Receiver;
+class ACE_RMCast_IO_UDP;
class ACE_INET_Addr;
class ACE_RMCast_Export ACE_RMCast_UDP_Event_Handler : public ACE_Event_Handler
{
public:
- ACE_RMCast_UDP_Event_Handler (ACE_RMCast_UDP_Receiver *receiver);
+ ACE_RMCast_UDP_Event_Handler (ACE_RMCast_IO_UDP *receiver);
// Constructor
~ACE_RMCast_UDP_Event_Handler (void);
@@ -41,7 +41,7 @@ public:
const void *act = 0);
private:
- ACE_RMCast_UDP_Receiver *receiver_;
+ ACE_RMCast_IO_UDP *io_udp_;
// The sender
};
diff --git a/protocols/ace/RMCast/RMCast_UDP_Event_Handler.i b/protocols/ace/RMCast/RMCast_UDP_Event_Handler.i
index b35aeefa3f4..99b4c0ac7e5 100644
--- a/protocols/ace/RMCast/RMCast_UDP_Event_Handler.i
+++ b/protocols/ace/RMCast/RMCast_UDP_Event_Handler.i
@@ -2,8 +2,7 @@
ACE_INLINE
ACE_RMCast_UDP_Event_Handler::
-ACE_RMCast_UDP_Event_Handler (ACE_RMCast_UDP_Receiver *receiver)
- : receiver_ (receiver)
+ACE_RMCast_UDP_Event_Handler (ACE_RMCast_IO_UDP *io)
+ : io_udp_ (io)
{
}
-
diff --git a/protocols/ace/RMCast/RMCast_UDP_Proxy.cpp b/protocols/ace/RMCast/RMCast_UDP_Proxy.cpp
new file mode 100644
index 00000000000..1fbad27f2cd
--- /dev/null
+++ b/protocols/ace/RMCast/RMCast_UDP_Proxy.cpp
@@ -0,0 +1,136 @@
+// $Id$
+
+#include "RMCast_UDP_Proxy.h"
+#include "RMCast_Module.h"
+#include "ace/Message_Block.h"
+
+#if !defined (__ACE_INLINE__)
+# include "RMCast_UDP_Proxy.i"
+#endif /* ! __ACE_INLINE__ */
+
+ACE_RCSID(ace, RMCast_UDP_Proxy, "$Id$")
+
+ACE_RMCast_UDP_Proxy::ACE_RMCast_UDP_Proxy (ACE_RMCast_IO_UDP *io_udp,
+ const ACE_INET_Addr &addr,
+ ACE_RMCast_Module *module)
+ : io_udp_ (io_udp)
+ , peer_addr_ (addr)
+ , module_ (module)
+{
+}
+
+ACE_RMCast_UDP_Proxy::~ACE_RMCast_UDP_Proxy (void)
+{
+}
+
+int
+ACE_RMCast_UDP_Proxy::receive_message (char *buffer, size_t size)
+{
+ int type = buffer[0];
+
+ // @@ What should we do with invalid messages like this?
+ //
+ if (type < 0 || type >= ACE_RMCast::MT_LAST)
+ return 0;
+
+ if (type == ACE_RMCast::MT_POLL)
+ {
+ ACE_RMCast::Poll poll;
+ return this->module ()->poll (poll);
+ }
+
+ else if (type == ACE_RMCast::MT_ACK_JOIN)
+ {
+ ACE_RMCast::Ack_Join ack_join;
+ const size_t header_size = 1 + sizeof(ACE_UINT32);
+ if (size < header_size)
+ {
+ // The message is too small
+ return 0;
+ }
+
+ ACE_UINT32 tmp;
+
+ ACE_OS::memcpy (&tmp, buffer + 1,
+ sizeof(tmp));
+ ack_join.next_sequence_number = ACE_NTOHL (tmp);
+ return this->module ()->ack_join (ack_join);
+ }
+
+ else if (type == ACE_RMCast::MT_ACK_LEAVE)
+ {
+ ACE_RMCast::Ack_Leave ack_leave;
+ return this->module ()->ack_leave (ack_leave);
+ }
+
+ else if (type == ACE_RMCast::MT_DATA)
+ {
+ ACE_RMCast::Data data;
+ const size_t header_size = 1 + 3 * sizeof(ACE_UINT32);
+ if (size < header_size)
+ {
+ // The message is too small
+ return 0;
+ }
+
+ ACE_UINT32 tmp;
+
+ ACE_OS::memcpy (&tmp, buffer + 1,
+ sizeof(tmp));
+ data.sequence_number = ACE_NTOHL (tmp);
+
+ ACE_OS::memcpy (&tmp, buffer + 1 + sizeof(tmp),
+ sizeof(tmp));
+ data.total_size = ACE_NTOHL (tmp);
+
+ ACE_OS::memcpy (&tmp, buffer + 1 + 2 * sizeof(tmp),
+ sizeof(tmp));
+ data.fragment_offset = ACE_NTOHL (tmp);
+
+ // Pass it up the module...
+ ACE_Message_Block *mb;
+ ACE_NEW_RETURN (mb, ACE_Message_Block, -1);
+ mb->size (size - header_size);
+ mb->copy (buffer + header_size, size - header_size);
+
+ data.payload = mb;
+ return this->module ()->data (data);
+ }
+
+ else if (type == ACE_RMCast::MT_JOIN)
+ {
+ ACE_RMCast::Join join;
+ return this->module ()->join (join);
+ }
+
+ else if (type == ACE_RMCast::MT_LEAVE)
+ {
+ ACE_RMCast::Leave leave;
+ return this->module ()->leave (leave);
+ }
+
+ else if (type == ACE_RMCast::MT_ACK)
+ {
+ ACE_RMCast::Ack ack;
+
+ const size_t header_size = 1 + sizeof(ACE_UINT32);
+ if (size < header_size)
+ {
+ // The message is too small
+ return 0;
+ }
+
+ ACE_UINT32 tmp;
+
+ ACE_OS::memcpy (&tmp, buffer + 1,
+ sizeof(tmp));
+ ack.highest_in_sequence = ACE_NTOHL (tmp);
+ ACE_OS::memcpy (&tmp, buffer + 1 + sizeof(ACE_UINT32),
+ sizeof(tmp));
+ ack.highest_received = ACE_NTOHL (tmp);
+
+ return this->module ()->ack (ack);
+ }
+
+ return 0;
+}
diff --git a/protocols/ace/RMCast/RMCast_Sender_Proxy.h b/protocols/ace/RMCast/RMCast_UDP_Proxy.h
index c6b51f78b48..aa7e08c65be 100644
--- a/protocols/ace/RMCast/RMCast_Sender_Proxy.h
+++ b/protocols/ace/RMCast/RMCast_UDP_Proxy.h
@@ -7,26 +7,28 @@
// ace
//
// = FILENAME
-// RMCast_Sender_Proxy.h
+// RMCast_UDP_Proxy.h
//
// = AUTHOR
// Carlos O'Ryan <coryan@uci.edu>
//
// ============================================================================
-#ifndef ACE_RMCAST_SENDER_PROXY_H
-#define ACE_RMCAST_SENDER_PROXY_H
+#ifndef ACE_RMCAST_UDP_PROXY_H
+#define ACE_RMCAST_UDP_PROXY_H
#include "ace/pre.h"
#include "RMCast.h"
+#include "ace/INET_Addr.h"
#if !defined (ACE_LACKS_PRAGMA_ONCE)
# pragma once
#endif /* ACE_LACKS_PRAGMA_ONCE */
class ACE_RMCast_Module;
+class ACE_RMCast_IO_UDP;
-class ACE_RMCast_Export ACE_RMCast_Sender_Proxy
+class ACE_RMCast_Export ACE_RMCast_UDP_Proxy
{
// = TITLE
// Reliable Multicast Sender Ambassador
@@ -35,27 +37,37 @@ class ACE_RMCast_Export ACE_RMCast_Sender_Proxy
// Implement an Ambassador for the reliable multicast senders.
//
public:
- ACE_RMCast_Sender_Proxy (ACE_RMCast_Module *module);
+ ACE_RMCast_UDP_Proxy (ACE_RMCast_IO_UDP *io_udp,
+ const ACE_INET_Addr &peer_addr,
+ ACE_RMCast_Module *module);
// Constructor
- virtual ~ACE_RMCast_Sender_Proxy (void);
+ virtual ~ACE_RMCast_UDP_Proxy (void);
// Destructor
- ACE_RMCast_Module *module (void) const;
- // Return the internal module
+ int receive_message (char *buffer, size_t size);
+ // Receive the message
+
+ const ACE_INET_Addr &peer_addr (void) const;
+ // The address of the peer
- virtual int receive_message (char *buffer, size_t size) = 0;
- // A new message has been received, process it
+ ACE_RMCast_Module *module (void) const;
+ // The module
private:
+ ACE_RMCast_IO_UDP *io_udp_;
+ // The IO facade
+
+ ACE_INET_Addr peer_addr_;
+ // The address of the peer
+
ACE_RMCast_Module *module_;
- // Process the data, control messages are processed by the Sender
- // proxy
+ // Process the data and control messages.
};
#if defined (__ACE_INLINE__)
-#include "RMCast_Sender_Proxy.i"
+#include "RMCast_UDP_Proxy.i"
#endif /* __ACE_INLINE__ */
#include "ace/post.h"
-#endif /* ACE_RMCAST_SENDER_PROXY_H */
+#endif /* ACE_RMCAST_UDP_PROXY_H */
diff --git a/protocols/ace/RMCast/RMCast_UDP_Proxy.i b/protocols/ace/RMCast/RMCast_UDP_Proxy.i
new file mode 100644
index 00000000000..8ef6142ed7c
--- /dev/null
+++ b/protocols/ace/RMCast/RMCast_UDP_Proxy.i
@@ -0,0 +1,13 @@
+// $Id$
+
+ACE_INLINE const ACE_INET_Addr&
+ACE_RMCast_UDP_Proxy::peer_addr (void) const
+{
+ return this->peer_addr_;
+}
+
+ACE_INLINE ACE_RMCast_Module *
+ACE_RMCast_UDP_Proxy::module (void) const
+{
+ return this->module_;
+}
diff --git a/protocols/ace/RMCast/RMCast_UDP_Receiver.cpp b/protocols/ace/RMCast/RMCast_UDP_Receiver.cpp
deleted file mode 100644
index eeb03f50bcf..00000000000
--- a/protocols/ace/RMCast/RMCast_UDP_Receiver.cpp
+++ /dev/null
@@ -1,241 +0,0 @@
-//
-// $Id$
-//
-
-#include "RMCast_UDP_Receiver.h"
-#include "RMCast_Sender_Proxy.h"
-#include "RMCast_Sender_Proxy_Factory.h"
-#include "ace/Handle_Set.h"
-#include "ace/Reactor.h"
-
-#if !defined (__ACE_INLINE__)
-# include "RMCast_UDP_Receiver.i"
-#endif /* ! __ACE_INLINE__ */
-
-ACE_RCSID(ace, RMCast_Receiver, "$Id$")
-
-ACE_RMCast_UDP_Receiver::~ACE_RMCast_UDP_Receiver (void)
-{
-}
-
-int
-ACE_RMCast_UDP_Receiver::subscribe (const ACE_INET_Addr &mcast_addr,
- int reuse_addr,
- const ACE_TCHAR *net_if,
- int protocol_family,
- int protocol)
-{
- return this->dgram_.subscribe (mcast_addr,
- reuse_addr,
- net_if,
- protocol_family,
- protocol);
-}
-
-int
-ACE_RMCast_UDP_Receiver::handle_events (ACE_Time_Value *tv)
-{
- ACE_HANDLE h = this->dgram_.get_handle ();
- if (h == ACE_INVALID_HANDLE)
- return -1;
-
- ACE_Handle_Set handle_set;
- handle_set.set_bit (h);
-
- ACE_Countdown_Time countdown (tv);
-
- int r = ACE_OS::select (int(h) + 1,
- handle_set, 0, 0,
- tv);
- if (r == -1)
- {
- if (errno == EINTR)
- return 0;
- else
- return -1;
- }
- else if (r == 0)
- {
- return 0;
- }
-
- return this->handle_input (h);
-}
-
-int
-ACE_RMCast_UDP_Receiver::register_handlers (ACE_Reactor *reactor)
-{
- this->eh_.reactor (reactor);
- return reactor->register_handler (&this->eh_,
- ACE_Event_Handler::READ_MASK);
-}
-
-int
-ACE_RMCast_UDP_Receiver::remove_handlers (void)
-{
- ACE_Reactor *r = this->eh_.reactor ();
- if (r != 0)
- {
- r->remove_handler (&this->eh_,
- ACE_Event_Handler::ALL_EVENTS_MASK
- | ACE_Event_Handler::DONT_CALL);
- this->eh_.reactor (0);
- }
- return 0;
-}
-
-int
-ACE_RMCast_UDP_Receiver::handle_input (ACE_HANDLE)
-{
- // @@ We should use a system constant instead of this literal
- const int max_udp_packet_size = 65536;
- char buffer[max_udp_packet_size];
-
- ACE_INET_Addr from_address;
- ssize_t r =
- this->dgram_.recv (buffer, sizeof(buffer), from_address);
-
- if (r == -1)
- {
- // @@ LOG??
- ACE_DEBUG ((LM_DEBUG,
- "RMCast_UDP_Receiver::handle_input () - "
- "error in recv\n"));
- return -1;
- }
-
- // ACE_HEX_DUMP ((LM_DEBUG, buffer, 16, "Receiver::handle_input"));
-
- // @@ Locking!
-
- int type = buffer[0];
-
- ACE_RMCast_Sender_Proxy *sender_proxy;
- if (this->map_.find (from_address, sender_proxy) != 0)
- {
- // State == RS_NON_EXISTENT
-
- if (type == ACE_RMCast::MT_ACK
- || type == ACE_RMCast::MT_JOIN
- || type == ACE_RMCast::MT_LEAVE
- || type == ACE_RMCast::MT_ACK_LEAVE)
- {
- // All these message types indicate a problem, the should be
- // generated by receivers, not received by them.
- return 0;
- }
-
- // The message type is valid, we must create a new proxy,
- // initially in the JOINING state...
- sender_proxy =
- this->factory_->create ();
- if (sender_proxy == 0)
- {
- // @@ LOG??
- return 0;
- }
- if (this->map_.bind (from_address, sender_proxy) != 0)
- {
- // @@ LOG??
- return 0;
- }
-
- // Send back a JOIN message...
- return sender_proxy->receive_message (buffer, r);
- }
-
- if (type == ACE_RMCast::MT_ACK
- || type == ACE_RMCast::MT_JOIN
- || type == ACE_RMCast::MT_LEAVE
- || type == ACE_RMCast::MT_ACK_LEAVE
- || type < 0
- || type >= ACE_RMCast::MT_LAST)
- {
- // In this case the message is invalid, but the proxy is already
- // in the table, must destroy it because there was a violation
- // in the protocol....
-
- this->factory_->destroy (sender_proxy);
- this->map_.unbind (from_address);
- return 0;
- }
-
- return sender_proxy->receive_message (buffer, r);
-}
-
-ACE_HANDLE
-ACE_RMCast_UDP_Receiver::get_handle (void) const
-{
- return this->dgram_.get_handle ();
-}
-
-#if 0
-int
-ACE_RMCast_UDP_Receiver::send_join (ACE_INET_Addr &from)
-{
- char buffer[16];
- buffer[0] = ACE_RMCast::MT_JOIN;
-
- ACE_SOCK_Dgram &dgram = this->dgram_;
- ssize_t r = dgram.send (buffer, 1, from);
-
- if (r == -1)
- return -1;
-
- return 0;
-}
-
-int
-ACE_RMCast_UDP_Receiver::send_ack (ACE_RMCast_Sender_Proxy *sender_proxy,
- ACE_INET_Addr &from)
-{
- char buffer[16];
- buffer[0] = ACE_RMCast::MT_ACK;
-
- ACE_UINT32 expected = sender_proxy->expected ();
- expected = ACE_HTONL (expected);
-
- ACE_UINT32 last_received = sender_proxy->last_received ();
- last_received = ACE_HTONL (last_received);
-
- ACE_OS::memcpy (buffer + 1, &expected, sizeof(expected));
- ACE_OS::memcpy (buffer + 1 + sizeof(expected), &last_received,
- sizeof(last_received));
-
- ACE_SOCK_Dgram &dgram = this->dgram_;
- ssize_t r = dgram.send (buffer,
- + sizeof(expected)
- + sizeof(last_received),
- from);
-
- if (r == -1)
- return -1;
-
- return 0;
-}
-
-int
-ACE_RMCast_UDP_Receiver::send_leave (ACE_INET_Addr &from)
-{
- char buffer[16];
- buffer[0] = ACE_RMCast::MT_LEAVE;
-
- ACE_SOCK_Dgram &dgram = this->dgram_;
- ssize_t r = dgram.send (buffer, 1, from);
-
- if (r == -1)
- return -1;
-
- return 0;
-}
-#endif /* 0 */
-
-#if defined (ACE_HAS_EXPLICIT_TEMPLATE_INSTANTIATION)
-
-template class ACE_Hash_Map_Manager<ACE_INET_Addr,ACE_RMCast_Sender_Proxy*,ACE_Null_Mutex>;
-template class ACE_Hash_Map_Manager_Ex<ACE_INET_Addr,ACE_RMCast_Sender_Proxy*,ACE_Hash<ACE_INET_Addr>,ACE_Equal_To<ACE_INET_Addr>,ACE_Null_Mutex>;
-template class ACE_Hash_Map_Iterator<ACE_INET_Addr,ACE_RMCast_Sender_Proxy*,ACE_Null_Mutex>;
-template class ACE_Hash_Map_Iterator_Base_Ex<ACE_INET_Addr,ACE_RMCast_Sender_Proxy*,ACE_Hash<ACE_INET_Addr>,ACE_Equal_To<ACE_INET_Addr>,ACE_Null_Mutex>;
-template class ACE_Hash_Map_Entry<ACE_INET_Addr,ACE_RMCast_Sender_Proxy*>;
-
-#endif /* ACE_HAS_EXPLICIT_TEMPLATE_INSTANTIATION */
diff --git a/protocols/ace/RMCast/RMCast_UDP_Receiver.i b/protocols/ace/RMCast/RMCast_UDP_Receiver.i
deleted file mode 100644
index 81aeb8e2752..00000000000
--- a/protocols/ace/RMCast/RMCast_UDP_Receiver.i
+++ /dev/null
@@ -1,9 +0,0 @@
-// $Id$
-
-ACE_INLINE
-ACE_RMCast_UDP_Receiver::
- ACE_RMCast_UDP_Receiver (ACE_RMCast_Sender_Proxy_Factory *factory)
- : factory_ (factory)
- , eh_ (this)
-{
-}
diff --git a/protocols/ace/RMCast/RMCast_UDP_Sender.cpp b/protocols/ace/RMCast/RMCast_UDP_Sender.cpp
deleted file mode 100644
index c02ad6fb9cf..00000000000
--- a/protocols/ace/RMCast/RMCast_UDP_Sender.cpp
+++ /dev/null
@@ -1,91 +0,0 @@
-//
-// $Id$
-//
-
-#include "RMCast_UDP_Sender.h"
-#include "RMCast.h"
-#include "ace/Message_Block.h"
-
-#if !defined (__ACE_INLINE__)
-# include "RMCast_UDP_Sender.i"
-#endif /* ! __ACE_INLINE__ */
-
-ACE_RCSID(ace, RMCast_UDP_Sender, "$Id$")
-
-ACE_RMCast_UDP_Sender::ACE_RMCast_UDP_Sender (const ACE_INET_Addr &mcast_addr)
- : ACE_RMCast_Module ()
- , mcast_addr_ (mcast_addr)
-{
-}
-
-ACE_RMCast_UDP_Sender::~ACE_RMCast_UDP_Sender (void)
-{
-}
-
-int
-ACE_RMCast_UDP_Sender::open (void)
-{
- return this->dgram_.open (ACE_Addr::sap_any);
-}
-
-int
-ACE_RMCast_UDP_Sender::close (void)
-{
- return this->dgram_.close ();
-}
-
-int
-ACE_RMCast_UDP_Sender::put_data (ACE_RMCast::Data &data)
-{
- // The first message block contains the header
- // @@ TODO: We could keep the header pre-initialized, and only
- // update the portions that do change...
- ACE_UINT32 tmp;
- char header[1 + 3 * sizeof(ACE_UINT32)];
- header[0] = ACE_RMCast::MT_DATA;
-
- tmp = ACE_HTONL (data.sequence_number);
- ACE_OS::memcpy (header + 1,
- &tmp, sizeof(ACE_UINT32));
- tmp = ACE_HTONL (data.total_size);
- ACE_OS::memcpy (header + 1 + sizeof(ACE_UINT32),
- &tmp, sizeof(ACE_UINT32));
- tmp = ACE_HTONL (data.fragment_offset);
- ACE_OS::memcpy (header + 1 + 2 * sizeof(ACE_UINT32),
- &tmp, sizeof(ACE_UINT32));
-
- iovec iov[IOV_MAX];
- int iovcnt = 1;
-
- iov[0].iov_base = header;
- iov[0].iov_len = sizeof(header);
-
- ACE_Message_Block *mb = data.payload;
-
- for (const ACE_Message_Block *i = mb; i != 0; i = i->cont ())
- {
- iov[iovcnt].iov_base = i->rd_ptr ();
- iov[iovcnt].iov_len = i->length ();
- iovcnt++;
- if (iovcnt >= IOV_MAX)
- return -1;
- }
-
- ACE_Time_Value tv (0, 10000);
- ACE_OS::sleep (tv);
- if (this->dgram_.send (iov, iovcnt,
- this->mcast_addr_) == -1)
- return -1;
-
-#if 0
- ACE_HEX_DUMP ((LM_DEBUG,
- (char*)iov[0].iov_base, iov[0].iov_len, "Sending"));
-#endif
-
- return 0;
-}
-
-
-#if defined (ACE_HAS_EXPLICIT_TEMPLATE_INSTANTIATION)
-
-#endif /* ACE_HAS_EXPLICIT_TEMPLATE_INSTANTIATION */
diff --git a/protocols/ace/RMCast/RMCast_UDP_Sender.h b/protocols/ace/RMCast/RMCast_UDP_Sender.h
deleted file mode 100644
index 474ebbc7f27..00000000000
--- a/protocols/ace/RMCast/RMCast_UDP_Sender.h
+++ /dev/null
@@ -1,70 +0,0 @@
-/* -*- C++ -*- */
-// $Id$
-
-// ============================================================================
-//
-// = LIBRARY
-// ace
-//
-// = FILENAME
-// RMCast_UDP_Sender.h
-//
-// = AUTHOR
-// Carlos O'Ryan <coryan@uci.edu>
-//
-// ============================================================================
-
-#ifndef ACE_RMCAST_UDP_SENDER_H
-#define ACE_RMCAST_UDP_SENDER_H
-#include "ace/pre.h"
-
-#include "RMCast_Module.h"
-#include "ace/SOCK_Dgram.h"
-
-#if !defined (ACE_LACKS_PRAGMA_ONCE)
-# pragma once
-#endif /* ACE_LACKS_PRAGMA_ONCE */
-
-class ACE_RMCast_Export ACE_RMCast_UDP_Sender : public ACE_RMCast_Module
-{
- // = TITLE
- // Reliable Multicast UDP_Sender
- //
- // = DESCRIPTION
- // Implements a Facade to the classes that implement a reliable
- // multicast protocol.
-public:
- // = Initialization and termination methods.
- ACE_RMCast_UDP_Sender (const ACE_INET_Addr &mcast_addr);
- // Constructor
-
- virtual ~ACE_RMCast_UDP_Sender (void);
- // Destructor
-
- // = The RMCast_Module methods
- virtual int open (void);
- virtual int close (void);
- virtual int put_data (ACE_RMCast::Data &data);
- // Send the Message block, this is the callback invoked at the end
- // of the stack.
-
-protected:
- ACE_SOCK_Dgram dgram_;
- // This is the socket used to send the multicast data.
- // @@ This should be strategized, what if we want to use something
- // like ATM networks to send the data, then the types would be
- // different....
-
- ACE_INET_Addr mcast_addr_;
- // The multicast group we send to.
- // @@ Can we really strategize the addressing, without introducing
- // too much complexity? How can we decouple the reliability aspect
- // from the transport aspects of the system???
-};
-
-#if defined (__ACE_INLINE__)
-#include "RMCast_UDP_Sender.i"
-#endif /* __ACE_INLINE__ */
-
-#include "ace/post.h"
-#endif /* ACE_RMCAST_UDP_SENDER_H */
diff --git a/protocols/ace/RMCast/RMCast_UDP_Sender.i b/protocols/ace/RMCast/RMCast_UDP_Sender.i
deleted file mode 100644
index cfa1da318d3..00000000000
--- a/protocols/ace/RMCast/RMCast_UDP_Sender.i
+++ /dev/null
@@ -1 +0,0 @@
-// $Id$
diff --git a/tests/RMCast/RMCast_Fragment_Test.cpp b/tests/RMCast/RMCast_Fragment_Test.cpp
index 36b52255e6f..909bab01f05 100644
--- a/tests/RMCast/RMCast_Fragment_Test.cpp
+++ b/tests/RMCast/RMCast_Fragment_Test.cpp
@@ -23,7 +23,7 @@ class ACE_RMCast_Fragment_Tester
public:
ACE_RMCast_Fragment_Tester (void);
- virtual int put_data (ACE_RMCast::Data &data);
+ virtual int data (ACE_RMCast::Data &data);
virtual int svc (void);
private:
@@ -84,7 +84,7 @@ ACE_RMCast_Fragment_Tester::svc (void)
ACE_RMCast::Data data;
data.payload = &big_blob;
- if (this->fragment_.put_data (data) == -1)
+ if (this->fragment_.data (data) == -1)
return -1;
if (this->received_bytes_ != n)
@@ -126,7 +126,7 @@ ACE_RMCast_Fragment_Tester::svc (void)
ACE_RMCast::Data data;
data.payload = small;
- if (this->fragment_.put_data (data) == -1)
+ if (this->fragment_.data (data) == -1)
return -1;
ACE_UINT32 total = n * size;
@@ -176,7 +176,7 @@ ACE_RMCast_Fragment_Tester::svc (void)
ACE_RMCast::Data data;
data.payload = small;
- if (this->fragment_.put_data (data) == -1)
+ if (this->fragment_.data (data) == -1)
return -1;
if (this->received_bytes_ != total)
@@ -246,7 +246,7 @@ ACE_RMCast_Fragment_Tester::compare (ACE_Message_Block *mb)
}
int
-ACE_RMCast_Fragment_Tester::put_data (ACE_RMCast::Data &data)
+ACE_RMCast_Fragment_Tester::data (ACE_RMCast::Data &data)
{
ACE_UINT32 sequence_number = data.sequence_number;
ACE_UINT32 message_size = data.total_size;
diff --git a/tests/RMCast/RMCast_Reassembly_Test.cpp b/tests/RMCast/RMCast_Reassembly_Test.cpp
index 12b148cae0d..34a262cb617 100644
--- a/tests/RMCast/RMCast_Reassembly_Test.cpp
+++ b/tests/RMCast/RMCast_Reassembly_Test.cpp
@@ -23,7 +23,7 @@ class ACE_RMCast_Reassembly_Tester
public:
ACE_RMCast_Reassembly_Tester (void);
- virtual int put_data (ACE_RMCast::Data &data);
+ virtual int data (ACE_RMCast::Data &data);
virtual int svc (void);
private:
@@ -250,7 +250,7 @@ ACE_RMCast_Reassembly_Tester::compare (ACE_Message_Block *received,
}
int
-ACE_RMCast_Reassembly_Tester::put_data (ACE_RMCast::Data &data)
+ACE_RMCast_Reassembly_Tester::data (ACE_RMCast::Data &data)
{
ACE_Message_Block *mb = data.payload;
@@ -286,7 +286,7 @@ ACE_RMCast_Reassembly_Tester::put_fragment (ACE_UINT32 sequence_number,
data.total_size = total_length;
data.fragment_offset = offset;
data.payload = &p;
- return this->reassembly_.put_data (data);
+ return this->reassembly_.data (data);
}
ACE_UINT32
diff --git a/tests/RMCast/RMCast_Tests.dsw b/tests/RMCast/RMCast_Tests.dsw
index 7471a20220b..f001518bff9 100644
--- a/tests/RMCast/RMCast_Tests.dsw
+++ b/tests/RMCast/RMCast_Tests.dsw
@@ -27,6 +27,18 @@ Package=<4>
###############################################################################
+Project: "RMCast_UDP_Best_Effort_Test"=.\RMCast_UDP_Best_Effort_Test.dsp - Package Owner=<4>
+
+Package=<5>
+{{{
+}}}
+
+Package=<4>
+{{{
+}}}
+
+###############################################################################
+
Global:
Package=<5>
diff --git a/tests/RMCast/RMCast_UDP_Best_Effort_Test.cpp b/tests/RMCast/RMCast_UDP_Best_Effort_Test.cpp
index 3467d4062d4..2358b3a3acb 100644
--- a/tests/RMCast/RMCast_UDP_Best_Effort_Test.cpp
+++ b/tests/RMCast/RMCast_UDP_Best_Effort_Test.cpp
@@ -11,12 +11,11 @@
// ============================================================================
#include "test_config.h"
-#include "ace/RMCast/RMCast_UDP_Sender.h"
+#include "ace/RMCast/RMCast_IO_UDP.h"
#include "ace/RMCast/RMCast_Fragment.h"
-#include "ace/RMCast/RMCast_UDP_Receiver.h"
-#include "ace/RMCast/RMCast_Sender_Proxy_Best_Effort.h"
-#include "ace/RMCast/RMCast_Sender_Proxy_Factory.h"
+#include "ace/RMCast/RMCast_Module_Factory.h"
+#include "ace/RMCast/RMCast_Fragment.h"
#include "ace/RMCast/RMCast_Reassembly.h"
#include "ace/Task.h"
@@ -24,33 +23,69 @@
ACE_RCSID(tests, RMCast_UDP_Best_Effort_Test, "$Id$")
const size_t message_size = 8 * 1024;
+const int total_message_count = 40;
// ****************************************************************
-class Sender : public ACE_Task_Base
+class Sender_Factory : public ACE_RMCast_Module_Factory
{
public:
- Sender (const ACE_INET_Addr &mcast_group);
+ Sender_Factory (void)
+ {
+ }
+
+ virtual ACE_RMCast_Module *create (ACE_RMCast_IO_UDP *)
+ {
+ return new ACE_RMCast_Reassembly;
+ }
+
+ virtual void destroy (ACE_RMCast_Module *module)
+ {
+ delete module;
+ }
+};
- virtual int svc (void);
+// ****************************************************************
+
+class Receiver_Factory : public ACE_RMCast_Module_Factory
+{
+public:
+ Receiver_Factory (ACE_RMCast_Module *module)
+ : module_ (module)
+ {
+ }
+
+ virtual ACE_RMCast_Module *create (ACE_RMCast_IO_UDP *)
+ {
+ ACE_RMCast_Module *x = new ACE_RMCast_Reassembly;
+ x->next (this->module_);
+ return x;
+ }
+
+ virtual void destroy (ACE_RMCast_Module *module)
+ {
+ delete module;
+ }
private:
- ACE_RMCast_UDP_Sender sender_;
- ACE_RMCast_Fragment fragment_;
+ ACE_RMCast_Module *module_;
};
// ****************************************************************
-class Sender_Proxy_Factory : public ACE_RMCast_Sender_Proxy_Factory
+class Sender : public ACE_Task_Base
{
public:
- Sender_Proxy_Factory (ACE_RMCast_Module *user_module);
+ Sender (const ACE_INET_Addr &mcast_group);
+
+ virtual int svc (void);
- virtual ACE_RMCast_Sender_Proxy *create (void);
- virtual void destroy (ACE_RMCast_Sender_Proxy *);
-
private:
- ACE_RMCast_Module *user_module_;
+ Sender_Factory factory_;
+ ACE_RMCast_IO_UDP io_udp_;
+ ACE_RMCast_Fragment fragment_;
+
+ ACE_INET_Addr mcast_group_;
};
// ****************************************************************
@@ -60,16 +95,22 @@ class Receiver : public ACE_RMCast_Module
public:
Receiver (const ACE_INET_Addr &mcast_group);
+ void dump (void);
+ // Print the results of the test
+
int handle_events (ACE_Time_Value *tv);
// Invoke the UDP Receiver handle_events function
virtual int open (void);
- virtual int put_data (ACE_RMCast::Data &data);
+ virtual int data (ACE_RMCast::Data &data);
private:
- Sender_Proxy_Factory factory_;
- ACE_RMCast_UDP_Receiver udp_receiver_;
+ Receiver_Factory factory_;
+ ACE_RMCast_IO_UDP io_udp_;
+
ACE_INET_Addr mcast_group_;
+
+ int message_count_;
};
// ****************************************************************
@@ -114,65 +155,38 @@ main (int, ACE_TCHAR *[])
if (ACE_Thread_Manager::instance ()->wait () != 0)
ACE_ERROR_RETURN ((LM_ERROR, "Error in Thread_Manager::wait\n"), 1);
+ receiver.dump ();
+
ACE_END_TEST;
return 0;
}
// ****************************************************************
-Sender_Proxy_Factory::Sender_Proxy_Factory (ACE_RMCast_Module *m)
- : user_module_ (m)
-{
-}
-
-ACE_RMCast_Sender_Proxy *
-Sender_Proxy_Factory::create (void)
-{
- ACE_RMCast_Module *top =
- new ACE_RMCast_Reassembly;
- top->next (this->user_module_);
-
- ACE_RMCast_Sender_Proxy *proxy =
- new ACE_RMCast_Sender_Proxy_Best_Effort (top);
-
- ACE_DEBUG ((LM_DEBUG, "Created proxy = %x\n", long(proxy)));
- return proxy;
-}
-
-void
-Sender_Proxy_Factory::destroy (ACE_RMCast_Sender_Proxy *proxy)
-{
- ACE_RMCast_Module *module = proxy->module ();
- delete module;
- delete proxy;
- ACE_DEBUG ((LM_DEBUG, "Destroyed proxy = %x\n", long(proxy)));
-}
-
-// ****************************************************************
-
Receiver::Receiver (const ACE_INET_Addr &mcast_group)
: factory_ (this)
- , udp_receiver_ (&factory_)
+ , io_udp_ (&factory_)
, mcast_group_ (mcast_group)
+ , message_count_ (0)
{
}
int
Receiver::handle_events (ACE_Time_Value *tv)
{
- return this->udp_receiver_.handle_events (tv);
+ return this->io_udp_.handle_events (tv);
}
int
Receiver::open (void)
{
- if (this->udp_receiver_.subscribe (this->mcast_group_) != 0)
- ACE_ERROR_RETURN ((LM_ERROR, "Error subscribing routine\n"), -1);
+ if (this->io_udp_.subscribe (this->mcast_group_) != 0)
+ ACE_ERROR_RETURN ((LM_ERROR, "Error in IO_UDP::subscribe\n"), -1);
return 0;
}
int
-Receiver::put_data (ACE_RMCast::Data &data)
+Receiver::data (ACE_RMCast::Data &data)
{
if (data.total_size != message_size)
ACE_ERROR_RETURN ((LM_ERROR,
@@ -198,25 +212,37 @@ Receiver::put_data (ACE_RMCast::Data &data)
long(j - data.payload->rd_ptr ())), -1);
}
+ this->message_count_++;
+
return 0;
}
+void
+Receiver::dump (void)
+{
+ ACE_DEBUG ((LM_DEBUG,
+ "Message count = %d/%d",
+ this->message_count_,
+ total_message_count));
+}
+
// ****************************************************************
Sender::Sender (const ACE_INET_Addr &mcast_group)
- : sender_ (mcast_group)
+ : io_udp_ (&factory_)
+ , mcast_group_ (mcast_group)
{
}
int
Sender::svc ()
{
- if (this->sender_.open () != 0)
- ACE_ERROR ((LM_ERROR, "Error in Sender::open()\n"));
-
- if (this->fragment_.next (&this->sender_) != 0)
+ if (this->fragment_.next (&this->io_udp_) != 0)
ACE_ERROR ((LM_ERROR, "Error in Fragment::next()\n"));
+ if (this->io_udp_.subscribe (this->mcast_group_) != 0)
+ ACE_ERROR ((LM_ERROR, "Error in IO_UDP::subscribe()\n"));
+
ACE_Message_Block big_blob (message_size);
big_blob.wr_ptr (message_size);
@@ -226,12 +252,12 @@ Sender::svc ()
*j = filler++;
}
- for (int i = 0; i != 20; ++i)
+ for (int i = 0; i != total_message_count; ++i)
{
ACE_RMCast::Data data;
data.sequence_number = i;
data.payload = &big_blob;
- this->fragment_.put_data (data);
+ this->fragment_.data (data);
}
return 0;
}
diff --git a/tests/RMCast/RMCast_UDP_Best_Effort_Test.dsp b/tests/RMCast/RMCast_UDP_Best_Effort_Test.dsp
new file mode 100644
index 00000000000..3771278b950
--- /dev/null
+++ b/tests/RMCast/RMCast_UDP_Best_Effort_Test.dsp
@@ -0,0 +1,96 @@
+# Microsoft Developer Studio Project File - Name="RMCast_UDP_Best_Effort_Test" - Package Owner=<4>
+# Microsoft Developer Studio Generated Build File, Format Version 6.00
+# ** DO NOT EDIT **
+
+# TARGTYPE "Win32 (x86) Console Application" 0x0103
+
+CFG=RMCast_UDP_Best_Effort_Test - Win32 Debug
+!MESSAGE This is not a valid makefile. To build this project using NMAKE,
+!MESSAGE use the Export Makefile command and run
+!MESSAGE
+!MESSAGE NMAKE /f "RMCast_UDP_Best_Effort_Test.mak".
+!MESSAGE
+!MESSAGE You can specify a configuration when running NMAKE
+!MESSAGE by defining the macro CFG on the command line. For example:
+!MESSAGE
+!MESSAGE NMAKE /f "RMCast_UDP_Best_Effort_Test.mak" CFG="RMCast_UDP_Best_Effort_Test - Win32 Debug"
+!MESSAGE
+!MESSAGE Possible choices for configuration are:
+!MESSAGE
+!MESSAGE "RMCast_UDP_Best_Effort_Test - Win32 Release" (based on "Win32 (x86) Console Application")
+!MESSAGE "RMCast_UDP_Best_Effort_Test - Win32 Debug" (based on "Win32 (x86) Console Application")
+!MESSAGE
+
+# Begin Project
+# PROP AllowPerConfigDependencies 0
+# PROP Scc_ProjName ""
+# PROP Scc_LocalPath ""
+CPP=cl.exe
+RSC=rc.exe
+
+!IF "$(CFG)" == "RMCast_UDP_Best_Effort_Test - Win32 Release"
+
+# PROP BASE Use_MFC 0
+# PROP BASE Use_Debug_Libraries 0
+# PROP BASE Output_Dir "Release"
+# PROP BASE Intermediate_Dir "Release"
+# PROP BASE Target_Dir ""
+# PROP Use_MFC 0
+# PROP Use_Debug_Libraries 0
+# PROP Output_Dir "Release"
+# PROP Intermediate_Dir "Release"
+# PROP Ignore_Export_Lib 0
+# PROP Target_Dir ""
+# ADD BASE CPP /nologo /W3 /GX /O2 /D "WIN32" /D "NDEBUG" /D "_CONSOLE" /D "_MBCS" /YX /FD /c
+# ADD CPP /nologo /MD /W3 /GX /O2 /I "..\.." /I ".." /D "WIN32" /D "NDEBUG" /D "_CONSOLE" /D "_MBCS" /FD /c
+# SUBTRACT CPP /YX
+# ADD BASE RSC /l 0x409 /d "NDEBUG"
+# ADD RSC /l 0x409 /d "NDEBUG"
+BSC32=bscmake.exe
+# ADD BASE BSC32 /nologo
+# ADD BSC32 /nologo
+LINK32=link.exe
+# ADD BASE LINK32 kernel32.lib user32.lib gdi32.lib winspool.lib comdlg32.lib advapi32.lib shell32.lib ole32.lib oleaut32.lib uuid.lib odbc32.lib odbccp32.lib kernel32.lib user32.lib gdi32.lib winspool.lib comdlg32.lib advapi32.lib shell32.lib ole32.lib oleaut32.lib uuid.lib odbc32.lib odbccp32.lib /nologo /subsystem:console /machine:I386
+# ADD LINK32 ace.lib ACE_RMCast.lib /nologo /subsystem:console /machine:I386 /libpath:"..\..\ace" /libpath:"..\..\ace\RMCast"
+
+!ELSEIF "$(CFG)" == "RMCast_UDP_Best_Effort_Test - Win32 Debug"
+
+# PROP BASE Use_MFC 0
+# PROP BASE Use_Debug_Libraries 1
+# PROP BASE Output_Dir "Debug"
+# PROP BASE Intermediate_Dir "Debug"
+# PROP BASE Target_Dir ""
+# PROP Use_MFC 0
+# PROP Use_Debug_Libraries 1
+# PROP Output_Dir "Debug"
+# PROP Intermediate_Dir "Debug"
+# PROP Ignore_Export_Lib 0
+# PROP Target_Dir ""
+# ADD BASE CPP /nologo /W3 /Gm /GX /Zi /Od /D "WIN32" /D "_DEBUG" /D "_CONSOLE" /D "_MBCS" /YX /FD /c
+# ADD CPP /nologo /MDd /W3 /Gm /GX /Zi /Od /I "..\.." /I ".." /D "WIN32" /D "_DEBUG" /D "_CONSOLE" /D "_MBCS" /FD /c
+# SUBTRACT CPP /YX
+# ADD BASE RSC /l 0x409 /d "_DEBUG"
+# ADD RSC /l 0x409 /d "_DEBUG"
+BSC32=bscmake.exe
+# ADD BASE BSC32 /nologo
+# ADD BSC32 /nologo
+LINK32=link.exe
+# ADD BASE LINK32 kernel32.lib user32.lib gdi32.lib winspool.lib comdlg32.lib advapi32.lib shell32.lib ole32.lib oleaut32.lib uuid.lib odbc32.lib odbccp32.lib kernel32.lib user32.lib gdi32.lib winspool.lib comdlg32.lib advapi32.lib shell32.lib ole32.lib oleaut32.lib uuid.lib odbc32.lib odbccp32.lib /nologo /subsystem:console /debug /machine:I386 /pdbtype:sept
+# ADD LINK32 ACE_RMCastd.lib aced.lib /nologo /subsystem:console /debug /machine:I386 /out:"RMCast_UDP_Best_Effort_Test.exe" /pdbtype:sept /libpath:"..\..\ace" /libpath:"..\..\ace\RMCast"
+
+!ENDIF
+
+# Begin Target
+
+# Name "RMCast_UDP_Best_Effort_Test - Win32 Release"
+# Name "RMCast_UDP_Best_Effort_Test - Win32 Debug"
+# Begin Group "Source Files"
+
+# PROP Default_Filter "cpp;c;cxx;rc;def;r;odl;idl;hpj;bat"
+# Begin Source File
+
+SOURCE=.\RMCast_UDP_Best_Effort_Test.cpp
+# End Source File
+# End Group
+# End Target
+# End Project