diff options
author | coryan <coryan@ae88bc3d-4319-0410-8dbf-d08b4c9d3795> | 2000-10-10 00:06:30 +0000 |
---|---|---|
committer | coryan <coryan@ae88bc3d-4319-0410-8dbf-d08b4c9d3795> | 2000-10-10 00:06:30 +0000 |
commit | b309b48ea1c3e382ec5296fbdbf249bf7d8c4a10 (patch) | |
tree | ce5a6138f080446ab83584240453558fc8db4a08 /protocols | |
parent | 687ad5454f7be7f4269f99d7f564ecf409110588 (diff) | |
download | ATCD-b309b48ea1c3e382ec5296fbdbf249bf7d8c4a10.tar.gz |
ChangeLogTag:Mon Oct 09 16:45:00 2000 Carlos O'Ryan <coryan@uci.edu>
Diffstat (limited to 'protocols')
33 files changed, 792 insertions, 84 deletions
diff --git a/protocols/ace/RMCast/Makefile b/protocols/ace/RMCast/Makefile index 868795fa82c..fb270696c5d 100644 --- a/protocols/ace/RMCast/Makefile +++ b/protocols/ace/RMCast/Makefile @@ -19,10 +19,16 @@ FILES= \ RMCast_Membership \ RMCast_Retransmission \ RMCast_Reordering \ + RMCast_Reliable_Factory \ + RMcast_Singleton_Factory \ + RMCast_Control_Splitter \ + RMcast_Resend_Handler \ \ RMCast_IO_UDP \ RMCast_UDP_Event_Handler \ - RMCast_UDP_Proxy + RMCast_UDP_Proxy \ + RMCast_UDP_Reliable_Receiver \ + RMCast_UDP_Reliable_Sender #---------------------------------------------------------------------------- # Include macros and targets diff --git a/protocols/ace/RMCast/RMCast.dsp b/protocols/ace/RMCast/RMCast.dsp index d160812fee6..e6cbc7af35a 100644 --- a/protocols/ace/RMCast/RMCast.dsp +++ b/protocols/ace/RMCast/RMCast.dsp @@ -98,6 +98,10 @@ SOURCE=.\RMCast.cpp # End Source File
# Begin Source File
+SOURCE=.\RMCast_Control_Splitter.cpp
+# End Source File
+# Begin Source File
+
SOURCE=.\RMCast_Fragment.cpp
# End Source File
# Begin Source File
@@ -130,20 +134,40 @@ SOURCE=.\RMCast_Reassembly.cpp # End Source File
# Begin Source File
+SOURCE=.\RMCast_Reliable_Factory.cpp
+# End Source File
+# Begin Source File
+
SOURCE=.\RMCast_Reordering.cpp
# End Source File
# Begin Source File
+SOURCE=.\RMCast_Resend_Handler.cpp
+# End Source File
+# Begin Source File
+
SOURCE=.\RMCast_Retransmission.cpp
# End Source File
# Begin Source File
+SOURCE=.\RMCast_Singleton_Factory.cpp
+# End Source File
+# Begin Source File
+
SOURCE=.\RMCast_UDP_Event_Handler.cpp
# End Source File
# Begin Source File
SOURCE=.\RMCast_UDP_Proxy.cpp
# End Source File
+# Begin Source File
+
+SOURCE=.\RMCast_UDP_Reliable_Receiver.cpp
+# End Source File
+# Begin Source File
+
+SOURCE=.\RMCast_UDP_Reliable_Sender.cpp
+# End Source File
# End Group
# Begin Group "Header Files"
@@ -154,6 +178,10 @@ SOURCE=.\RMCast.h # End Source File
# Begin Source File
+SOURCE=.\RMCast_Control_Splitter.h
+# End Source File
+# Begin Source File
+
SOURCE=.\RMCast_Export.h
# End Source File
# Begin Source File
@@ -190,20 +218,40 @@ SOURCE=.\RMCast_Reassembly.h # End Source File
# Begin Source File
+SOURCE=.\RMCast_Reliable_Factory.h
+# End Source File
+# Begin Source File
+
SOURCE=.\RMCast_Reordering.h
# End Source File
# Begin Source File
+SOURCE=.\RMCast_Resend_Handler.h
+# End Source File
+# Begin Source File
+
SOURCE=.\RMCast_Retransmission.h
# End Source File
# Begin Source File
+SOURCE=.\RMCast_Singleton_Factory.h
+# End Source File
+# Begin Source File
+
SOURCE=.\RMCast_UDP_Event_Handler.h
# End Source File
# Begin Source File
SOURCE=.\RMCast_UDP_Proxy.h
# End Source File
+# Begin Source File
+
+SOURCE=.\RMCast_UDP_Reliable_Receiver.h
+# End Source File
+# Begin Source File
+
+SOURCE=.\RMCast_UDP_Reliable_Sender.h
+# End Source File
# End Group
# Begin Group "Inline Files"
@@ -214,6 +262,10 @@ SOURCE=.\RMCast.i # End Source File
# Begin Source File
+SOURCE=.\RMCast_Control_Splitter.i
+# End Source File
+# Begin Source File
+
SOURCE=.\RMCast_Fragment.i
# End Source File
# Begin Source File
@@ -246,20 +298,40 @@ SOURCE=.\RMCast_Reassembly.i # End Source File
# Begin Source File
+SOURCE=.\RMCast_Reliable_Factory.i
+# End Source File
+# Begin Source File
+
SOURCE=.\RMCast_Reordering.i
# End Source File
# Begin Source File
+SOURCE=.\RMCast_Resend_Handler.i
+# End Source File
+# Begin Source File
+
SOURCE=.\RMCast_Retransmission.i
# End Source File
# Begin Source File
+SOURCE=.\RMCast_Singleton_Factory.i
+# End Source File
+# Begin Source File
+
SOURCE=.\RMCast_UDP_Event_Handler.i
# End Source File
# Begin Source File
SOURCE=.\RMCast_UDP_Proxy.i
# End Source File
+# Begin Source File
+
+SOURCE=.\RMCast_UDP_Reliable_Receiver.i
+# End Source File
+# Begin Source File
+
+SOURCE=.\RMCast_UDP_Reliable_Sender.i
+# End Source File
# End Group
# Begin Group "Template Files"
diff --git a/protocols/ace/RMCast/RMCast_Control_Splitter.cpp b/protocols/ace/RMCast/RMCast_Control_Splitter.cpp new file mode 100644 index 00000000000..b06e9bfc467 --- /dev/null +++ b/protocols/ace/RMCast/RMCast_Control_Splitter.cpp @@ -0,0 +1,37 @@ +// $Id$ + +#include "RMCast_Control_Splitter.h" + +#if !defined (__ACE_INLINE__) +# include "RMCast_Control_Splitter.i" +#endif /* ! __ACE_INLINE__ */ + +ACE_RCSID(ace, RMCast_Control_Splitter, "$Id$") + +ACE_RMCast_Control_Splitter::~ACE_RMCast_Control_Splitter (void) +{ +} + +int +ACE_RMCast_Control_Splitter::ack (ACE_RMCast::Ack &ack) +{ + if (this->control_module () != 0) + return this->control_module ()->ack (ack); + return 0; +} + +int +ACE_RMCast_Control_Splitter::join (ACE_RMCast::Join &join) +{ + if (this->control_module () != 0) + return this->control_module ()->join (join); + return 0; +} + +int +ACE_RMCast_Control_Splitter::leave (ACE_RMCast::Leave &leave) +{ + if (this->control_module () != 0) + return this->control_module ()->leave (leave); + return 0; +} diff --git a/protocols/ace/RMCast/RMCast_Control_Splitter.h b/protocols/ace/RMCast/RMCast_Control_Splitter.h new file mode 100644 index 00000000000..d1c0b5df3a0 --- /dev/null +++ b/protocols/ace/RMCast/RMCast_Control_Splitter.h @@ -0,0 +1,49 @@ +/* -*- C++ -*- */ +// $Id$ + +#ifndef ACE_RMCAST_CONTROL_SPLITTER_H +#define ACE_RMCAST_CONTROL_SPLITTER_H +#include "ace/pre.h" + +#include "RMCast_Module.h" + +#if !defined (ACE_LACKS_PRAGMA_ONCE) +# pragma once +#endif /* ACE_LACKS_PRAGMA_ONCE */ + +/// Split control and data messages different modules in the stack +/** + * In some instances the control messages must go a destination + * different from the regular data flow. This class performs this + * separation. + */ +class ACE_RMCast_Export ACE_RMCast_Control_Splitter : public ACE_RMCast_Module +{ +public: + //! Constructor + ACE_RMCast_Control_Splitter (void); + + //! Destructor + virtual ~ACE_RMCast_Control_Splitter (void); + + /// Set the control module, all incoming control messages go to it + void control_module (ACE_RMCast_Module *module); + + /// Return the current control module + ACE_RMCast_Module *control_module (void) const; + + virtual int ack (ACE_RMCast::Ack &); + virtual int join (ACE_RMCast::Join &); + virtual int leave (ACE_RMCast::Leave &); + +private: + /// The control module + ACE_RMCast_Module *control_module_; +}; + +#if defined (__ACE_INLINE__) +#include "RMCast_Control_Splitter.i" +#endif /* __ACE_INLINE__ */ + +#include "ace/post.h" +#endif /* ACE_RMCAST_CONTROL_SPLITTER_H */ diff --git a/protocols/ace/RMCast/RMCast_Control_Splitter.i b/protocols/ace/RMCast/RMCast_Control_Splitter.i new file mode 100644 index 00000000000..7cc8205fcce --- /dev/null +++ b/protocols/ace/RMCast/RMCast_Control_Splitter.i @@ -0,0 +1,21 @@ +// $Id$ + +ACE_INLINE +ACE_RMCast_Control_Splitter::ACE_RMCast_Control_Splitter (void) + : ACE_RMCast_Module () + , control_module_ (0) +{ +} + +ACE_INLINE void +ACE_RMCast_Control_Splitter::control_module (ACE_RMCast_Module *module) +{ + this->control_module_ = module; +} + + +ACE_INLINE ACE_RMCast_Module * +ACE_RMCast_Control_Splitter::control_module (void) const +{ + return this->control_module_; +} diff --git a/protocols/ace/RMCast/RMCast_Copy_On_Write.cpp b/protocols/ace/RMCast/RMCast_Copy_On_Write.cpp index eeb3b9422c2..39bf16f88a2 100644 --- a/protocols/ace/RMCast/RMCast_Copy_On_Write.cpp +++ b/protocols/ace/RMCast/RMCast_Copy_On_Write.cpp @@ -87,6 +87,16 @@ ACE_RMCast_Copy_On_Write<KEY,ITEM,C,ITERATOR>::first_key (void) return (*begin).key (); } +template<class KEY, class ITEM, class C, class ITERATOR> int +ACE_RMCast_Copy_On_Write<KEY,ITEM,C,ITERATOR>::empty (void) +{ + Read_Guard ace_mon (*this); + ITERATOR end = ace_mon.collection->collection.end (); + ITERATOR begin = ace_mon.collection->collection.begin (); + + return end == begin; +} + template<class KEY, class ITEM, class C, class I> int ACE_RMCast_Copy_On_Write<KEY,ITEM,C,I>::bind (KEY const & k, ITEM const & i) diff --git a/protocols/ace/RMCast/RMCast_Copy_On_Write.h b/protocols/ace/RMCast/RMCast_Copy_On_Write.h index d3513a05d2d..605970c7c85 100644 --- a/protocols/ace/RMCast/RMCast_Copy_On_Write.h +++ b/protocols/ace/RMCast/RMCast_Copy_On_Write.h @@ -181,6 +181,9 @@ public: //! Get the first key KEY first_key (void); + /// Return non-zero if the collection is empty + int empty (void); + //! Add a new element int bind (KEY const & key, ITEM const & item); diff --git a/protocols/ace/RMCast/RMCast_IO_UDP.cpp b/protocols/ace/RMCast/RMCast_IO_UDP.cpp index 421982d5ad6..4fe1488bad0 100644 --- a/protocols/ace/RMCast/RMCast_IO_UDP.cpp +++ b/protocols/ace/RMCast/RMCast_IO_UDP.cpp @@ -20,6 +20,19 @@ ACE_RMCast_IO_UDP::~ACE_RMCast_IO_UDP (void) } int +ACE_RMCast_IO_UDP::open (const ACE_INET_Addr &mcast_group, + const ACE_Addr &local, + int protocol_family, + int protocol, + int reuse_addr) +{ + this->mcast_group_ = mcast_group; + + ACE_SOCK_Dgram &dgram = this->dgram_; + return dgram.open (local, protocol_family, protocol, reuse_addr); +} + +int ACE_RMCast_IO_UDP::subscribe (const ACE_INET_Addr &mcast_addr, int reuse_addr, const ACE_TCHAR *net_if, @@ -65,28 +78,6 @@ ACE_RMCast_IO_UDP::handle_events (ACE_Time_Value *tv) } int -ACE_RMCast_IO_UDP::register_handlers (ACE_Reactor *reactor) -{ - this->eh_.reactor (reactor); - return reactor->register_handler (&this->eh_, - ACE_Event_Handler::READ_MASK); -} - -int -ACE_RMCast_IO_UDP::remove_handlers (void) -{ - ACE_Reactor *r = this->eh_.reactor (); - if (r != 0) - { - r->remove_handler (&this->eh_, - ACE_Event_Handler::ALL_EVENTS_MASK - | ACE_Event_Handler::DONT_CALL); - this->eh_.reactor (0); - } - return 0; -} - -int ACE_RMCast_IO_UDP::handle_input (ACE_HANDLE) { // @@ We should use a system constant instead of this literal @@ -102,7 +93,7 @@ ACE_RMCast_IO_UDP::handle_input (ACE_HANDLE) // @@ LOG?? ACE_DEBUG ((LM_DEBUG, "RMCast_IO_UDP::handle_input () - " - "error in recv\n")); + "error in recv %p\n", "")); return -1; } @@ -156,7 +147,7 @@ ACE_RMCast_IO_UDP::handle_input (ACE_HANDLE) // The message type is valid, we must create a new proxy, // initially in the JOINING state... - ACE_RMCast_Module *module = this->factory_->create (this); + ACE_RMCast_Module *module = this->factory_->create (); if (module == 0) { // @@ LOG?? diff --git a/protocols/ace/RMCast/RMCast_IO_UDP.h b/protocols/ace/RMCast/RMCast_IO_UDP.h index 5af403bf994..e68b360576d 100644 --- a/protocols/ace/RMCast/RMCast_IO_UDP.h +++ b/protocols/ace/RMCast/RMCast_IO_UDP.h @@ -15,7 +15,6 @@ #include "ace/pre.h" #include "RMCast_Module.h" -#include "RMCast_UDP_Event_Handler.h" #include "ace/SOCK_Dgram_Mcast.h" #include "ace/Hash_Map_Manager.h" #include "ace/Synch.h" @@ -27,7 +26,6 @@ class ACE_RMCast_UDP_Proxy; class ACE_RMCast_Module_Factory; -class ACE_Reactor; class ACE_Time_Value; class ACE_RMCast_Export ACE_RMCast_IO_UDP : public ACE_RMCast_Module @@ -46,6 +44,20 @@ public: //! Destructor ~ACE_RMCast_IO_UDP (void); + /// Open the internal socket, but only to send multicast data. + /** + * It is not clear to me if this is a valid configuration. Maybe it + * would be a problem to expose two separate, incompatible + * interfaces (check the subscribe() method). However, the + * alternative would be to implement almost identical class for + * outgoing and incoming UDP I/O + */ + int open (const ACE_INET_Addr &mcast_group, + const ACE_Addr &local, + int protocol_family = PF_INET, + int protocol = 0, + int reuse_addr = 0); + //! Join a new multicast group /*! * Start receiving data for the <mcast_addr> multicast group. @@ -65,20 +77,6 @@ public: //! forever. int handle_events (ACE_Time_Value *tv = 0); - //! Register any event handlers into <reactor> - /*! - * @@TODO: This should be left for the clients of the class, there - * is no reason why this class must know about reactors. - */ - int register_handlers (ACE_Reactor *reactor); - - //! Remove all the handlers from the reactor - /*! - * @@TODO: This should be left for the clients of the class, there - * is no reason why this class must know about reactors. - */ - int remove_handlers (void); - //! There is data to read, read it and process it. int handle_input (ACE_HANDLE h); @@ -124,9 +122,6 @@ private: typedef ACE_Hash_Map_Manager<ACE_INET_Addr,ACE_RMCast_UDP_Proxy*,ACE_Null_Mutex> Map; //! The collection of proxies Map map_; - - //! The event handler adapter - ACE_RMCast_UDP_Event_Handler eh_; }; #if defined (__ACE_INLINE__) diff --git a/protocols/ace/RMCast/RMCast_IO_UDP.i b/protocols/ace/RMCast/RMCast_IO_UDP.i index ddacc5694ad..f542e85df70 100644 --- a/protocols/ace/RMCast/RMCast_IO_UDP.i +++ b/protocols/ace/RMCast/RMCast_IO_UDP.i @@ -4,6 +4,5 @@ ACE_INLINE ACE_RMCast_IO_UDP:: ACE_RMCast_IO_UDP (ACE_RMCast_Module_Factory *factory) : factory_ (factory) - , eh_ (this) { } diff --git a/protocols/ace/RMCast/RMCast_Module.cpp b/protocols/ace/RMCast/RMCast_Module.cpp index 632d905f900..08ff971f039 100644 --- a/protocols/ace/RMCast/RMCast_Module.cpp +++ b/protocols/ace/RMCast/RMCast_Module.cpp @@ -30,21 +30,6 @@ ACE_RMCast_Module::next (void) const } int -ACE_RMCast_Module::prev (ACE_RMCast_Module *prev) -{ - if (this->prev_ != 0 && prev != 0) - return 1; - this->prev_ = prev; - return 0; -} - -ACE_RMCast_Module * -ACE_RMCast_Module::prev (void) const -{ - return this->prev_; -} - -int ACE_RMCast_Module::open (void) { return 0; diff --git a/protocols/ace/RMCast/RMCast_Module.h b/protocols/ace/RMCast/RMCast_Module.h index fad76caac53..d362dac2366 100644 --- a/protocols/ace/RMCast/RMCast_Module.h +++ b/protocols/ace/RMCast/RMCast_Module.h @@ -48,12 +48,6 @@ public: //! Accesor for the next element in the stack virtual ACE_RMCast_Module* next (void) const; - //! Modifier for the previous element in the stack - virtual int prev (ACE_RMCast_Module *prev); - - //! Accesor for the previous element in the stack - virtual ACE_RMCast_Module* prev (void) const; - //! Initialize the module, setting up the next module virtual int open (void); @@ -84,9 +78,6 @@ public: private: //! The next element in the stack ACE_RMCast_Module *next_; - - //! The previous element in the stack - ACE_RMCast_Module *prev_; }; #if defined (__ACE_INLINE__) diff --git a/protocols/ace/RMCast/RMCast_Module.i b/protocols/ace/RMCast/RMCast_Module.i index 61099903d20..af299d706e3 100644 --- a/protocols/ace/RMCast/RMCast_Module.i +++ b/protocols/ace/RMCast/RMCast_Module.i @@ -3,6 +3,5 @@ ACE_INLINE ACE_RMCast_Module::ACE_RMCast_Module (void) : next_ (0) - , prev_ (0) { } diff --git a/protocols/ace/RMCast/RMCast_Module_Factory.h b/protocols/ace/RMCast/RMCast_Module_Factory.h index f0ea58df0e5..97e6ce29235 100644 --- a/protocols/ace/RMCast/RMCast_Module_Factory.h +++ b/protocols/ace/RMCast/RMCast_Module_Factory.h @@ -52,7 +52,7 @@ public: virtual ~ACE_RMCast_Module_Factory (void); //! Create a new proxy - virtual ACE_RMCast_Module *create (ACE_RMCast_IO_UDP *) = 0; + virtual ACE_RMCast_Module *create (void) = 0; //! Destroy a proxy /*! diff --git a/protocols/ace/RMCast/RMCast_Reliable_Factory.cpp b/protocols/ace/RMCast/RMCast_Reliable_Factory.cpp new file mode 100644 index 00000000000..b06b9d0953d --- /dev/null +++ b/protocols/ace/RMCast/RMCast_Reliable_Factory.cpp @@ -0,0 +1,46 @@ +// $Id$ + +#include "RMCast_Reliable_Factory.h" +#include "RMCast_Reassembly.h" +#include "RMCast_Reordering.h" + +#if !defined (__ACE_INLINE__) +# include "RMCast_Reliable_Factory.i" +#endif /* ! __ACE_INLINE__ */ + +ACE_RCSID(ace, RMCast_Reliable_Factory, "$Id$") + +ACE_RMCast_Reliable_Factory::~ACE_RMCast_Reliable_Factory (void) +{ +} + +ACE_RMCast_Module* +ACE_RMCast_Reliable_Factory::create (void) +{ + ACE_RMCast_Module *reassembly; + ACE_NEW_RETURN (reassembly, ACE_RMCast_Reassembly, 0); + + ACE_RMCast_Module *reordering; + ACE_NEW_RETURN (reordering, ACE_RMCast_Reordering, 0); + + ACE_RMCast_Module *user = this->factory_->create (); + if (user == 0) + { + delete reordering; + delete reassembly; + return 0; + } + reassembly->next (reordering); + reordering->next (user); + return reassembly; +} + +void +ACE_RMCast_Reliable_Factory::destroy (ACE_RMCast_Module *reassembly) +{ + ACE_RMCast_Module *reordering = reassembly->next (); + ACE_RMCast_Module *user = reordering->next (); + this->factory_->destroy (user); + delete reordering; + delete reassembly; +} diff --git a/protocols/ace/RMCast/RMCast_Reliable_Factory.h b/protocols/ace/RMCast/RMCast_Reliable_Factory.h new file mode 100644 index 00000000000..e41c7fd9e74 --- /dev/null +++ b/protocols/ace/RMCast/RMCast_Reliable_Factory.h @@ -0,0 +1,60 @@ +/* -*- C++ -*- */ +// $Id$ + +// ============================================================================ +// +// = LIBRARY +// ace +// +// = FILENAME +// RMCast_Reliable_Factory.h +// +// = AUTHOR +// Carlos O'Ryan <coryan@uci.edu> +// +// ============================================================================ + +#ifndef ACE_RMCAST_RELIABLE_FACTORY_H +#define ACE_RMCAST_RELIABLE_FACTORY_H +#include "ace/pre.h" + +#include "RMCast_Module_Factory.h" + +#if !defined (ACE_LACKS_PRAGMA_ONCE) +# pragma once +#endif /* ACE_LACKS_PRAGMA_ONCE */ + +/// Implement an ACE_RMCast_Module_Factory that "creates" a single +/// object. +/** + * Many applications (and even some internal components), will use a + * single ACE_RMCast_Module to process all the events, for example, a + * receiver may decide to use the same ACE_RMCast_Module to process + * all incoming events, instead of using one per remote sender. + */ +class ACE_RMCast_Export ACE_RMCast_Reliable_Factory : public ACE_RMCast_Module_Factory +{ +public: + /// Constructor + /** + * The create() method will return always \param reliable. + */ + ACE_RMCast_Reliable_Factory (ACE_RMCast_Module_Factory *factory); + + //! Destructor + virtual ~ACE_RMCast_Reliable_Factory (void); + + virtual ACE_RMCast_Module *create (void); + virtual void destroy (ACE_RMCast_Module *); + +private: + /// Delegate on another factory to create the user module + ACE_RMCast_Module_Factory *factory_; +}; + +#if defined (__ACE_INLINE__) +#include "RMCast_Reliable_Factory.i" +#endif /* __ACE_INLINE__ */ + +#include "ace/post.h" +#endif /* ACE_RMCAST_RELIABLE_FACTORY_H */ diff --git a/protocols/ace/RMCast/RMCast_Reliable_Factory.i b/protocols/ace/RMCast/RMCast_Reliable_Factory.i new file mode 100644 index 00000000000..47ba22754ec --- /dev/null +++ b/protocols/ace/RMCast/RMCast_Reliable_Factory.i @@ -0,0 +1,8 @@ +// $Id$ + +ACE_INLINE +ACE_RMCast_Reliable_Factory:: + ACE_RMCast_Reliable_Factory (ACE_RMCast_Module_Factory *factory) + : factory_ (factory) +{ +} diff --git a/protocols/ace/RMCast/RMCast_Resend_Handler.cpp b/protocols/ace/RMCast/RMCast_Resend_Handler.cpp new file mode 100644 index 00000000000..130a1d6b5ca --- /dev/null +++ b/protocols/ace/RMCast/RMCast_Resend_Handler.cpp @@ -0,0 +1,22 @@ +// $Id$ + +#include "RMCast_Resend_Handler.h" +#include "RMCast_Retransmission.h" + +#if !defined (__ACE_INLINE__) +# include "RMCast_Resend_Handler.i" +#endif /* ! __ACE_INLINE__ */ + +ACE_RCSID(ace, RMCast_Resend_Handler, "$Id$") + +ACE_RMCast_Resend_Handler::~ACE_RMCast_Resend_Handler (void) +{ +} + +int +ACE_RMCast_Resend_Handler::handle_timeout (const ACE_Time_Value &, + const void *) +{ + (void) this->retransmission_->resend (0 /* @@ TODO */); + return 0; +} diff --git a/protocols/ace/RMCast/RMCast_Resend_Handler.h b/protocols/ace/RMCast/RMCast_Resend_Handler.h new file mode 100644 index 00000000000..0d2ec0fe13c --- /dev/null +++ b/protocols/ace/RMCast/RMCast_Resend_Handler.h @@ -0,0 +1,44 @@ +// $Id$ + +#ifndef ACE_RMCAST_RESEND_HANDLER_H +#define ACE_RMCAST_RESEND_HANDLER_H +#include "ace/pre.h" + +#include "RMCast_Export.h" +#include "ace/Event_Handler.h" + +#if !defined (ACE_LACKS_PRAGMA_ONCE) +# pragma once +#endif /* ACE_LACKS_PRAGMA_ONCE */ + +class ACE_RMCast_Retransmission; + +/// Implement an adapter to resend messages in the +/// ACE_RMCast_Retransmission layer, but based on Reactor based +/// timeouts. +class ACE_RMCast_Export ACE_RMCast_Resend_Handler : public ACE_Event_Handler +{ +public: + /// Constructor, save io_udp as the Adaptee in the Adapter pattern. + ACE_RMCast_Resend_Handler (ACE_RMCast_Retransmission *retransmission); + + /// Destructor + ~ACE_RMCast_Resend_Handler (void); + + //@{ + //! Documented in ACE_Event_Handler class + virtual int handle_timeout (const ACE_Time_Value ¤t_time, + const void *act = 0); + //@} + +private: + //! The adaptee + ACE_RMCast_Retransmission *retransmission_; +}; + +#if defined (__ACE_INLINE__) +#include "RMCast_Resend_Handler.i" +#endif /* __ACE_INLINE__ */ + +#include "ace/post.h" +#endif /* ACE_RMCAST_RESEND_HANDLER_H */ diff --git a/protocols/ace/RMCast/RMCast_Resend_Handler.i b/protocols/ace/RMCast/RMCast_Resend_Handler.i new file mode 100644 index 00000000000..9ad6cd8870d --- /dev/null +++ b/protocols/ace/RMCast/RMCast_Resend_Handler.i @@ -0,0 +1,8 @@ +// $Id$ + +ACE_INLINE +ACE_RMCast_Resend_Handler:: +ACE_RMCast_Resend_Handler (ACE_RMCast_Retransmission *r) + : retransmission_ (r) +{ +} diff --git a/protocols/ace/RMCast/RMCast_Retransmission.cpp b/protocols/ace/RMCast/RMCast_Retransmission.cpp index c9db70cbf63..a23829cd54e 100644 --- a/protocols/ace/RMCast/RMCast_Retransmission.cpp +++ b/protocols/ace/RMCast/RMCast_Retransmission.cpp @@ -67,6 +67,26 @@ ACE_RMCast_Retransmission::resend (ACE_UINT32 max_sequence_number) } int +ACE_RMCast_Retransmission::resend_all (void) +{ + if (this->next () == 0) + return 0; + + ACE_RMCast_Resend_Worker worker (this->next (), ACE_UINT32_MAX); + + if (this->messages_.for_each (&worker) == -1) + return -1; + + return worker.n; +} + +int +ACE_RMCast_Retransmission::has_data (void) +{ + return !this->messages_.empty (); +} + +int ACE_RMCast_Retransmission::close (void) { // @@ diff --git a/protocols/ace/RMCast/RMCast_Retransmission.h b/protocols/ace/RMCast/RMCast_Retransmission.h index b7bc20d2914..f49c0a73f9c 100644 --- a/protocols/ace/RMCast/RMCast_Retransmission.h +++ b/protocols/ace/RMCast/RMCast_Retransmission.h @@ -59,6 +59,15 @@ public: */ int resend (ACE_UINT32 max_sequence_number); + //! Resend all messages + /*! + * Resends all the messages currently in the queue. + */ + int resend_all (void); + + /// Return 0 if there is no pending data to send + int has_data (void); + //! Cleanup all the stored messages virtual int close (void); diff --git a/protocols/ace/RMCast/RMCast_Singleton_Factory.cpp b/protocols/ace/RMCast/RMCast_Singleton_Factory.cpp new file mode 100644 index 00000000000..403b87060e2 --- /dev/null +++ b/protocols/ace/RMCast/RMCast_Singleton_Factory.cpp @@ -0,0 +1,24 @@ +// $Id$ + +#include "RMCast_Singleton_Factory.h" + +#if !defined (__ACE_INLINE__) +# include "RMCast_Singleton_Factory.i" +#endif /* ! __ACE_INLINE__ */ + +ACE_RCSID(ace, RMCast_Singleton_Factory, "$Id$") + +ACE_RMCast_Singleton_Factory::~ACE_RMCast_Singleton_Factory (void) +{ +} + +ACE_RMCast_Module* +ACE_RMCast_Singleton_Factory::create (void) +{ + return this->singleton_; +} + +void +ACE_RMCast_Singleton_Factory::destroy (ACE_RMCast_Module *) +{ +} diff --git a/protocols/ace/RMCast/RMCast_Singleton_Factory.h b/protocols/ace/RMCast/RMCast_Singleton_Factory.h new file mode 100644 index 00000000000..5e8b27a8e0b --- /dev/null +++ b/protocols/ace/RMCast/RMCast_Singleton_Factory.h @@ -0,0 +1,60 @@ +/* -*- C++ -*- */ +// $Id$ + +// ============================================================================ +// +// = LIBRARY +// ace +// +// = FILENAME +// RMCast_Singleton_Factory.h +// +// = AUTHOR +// Carlos O'Ryan <coryan@uci.edu> +// +// ============================================================================ + +#ifndef ACE_RMCAST_SINGLETON_FACTORY_H +#define ACE_RMCAST_SINGLETON_FACTORY_H +#include "ace/pre.h" + +#include "RMCast_Module_Factory.h" + +#if !defined (ACE_LACKS_PRAGMA_ONCE) +# pragma once +#endif /* ACE_LACKS_PRAGMA_ONCE */ + +/// Implement an ACE_RMCast_Module_Factory that "creates" a single +/// object. +/** + * Many applications (and even some internal components), will use a + * single ACE_RMCast_Module to process all the events, for example, a + * receiver may decide to use the same ACE_RMCast_Module to process + * all incoming events, instead of using one per remote sender. + */ +class ACE_RMCast_Export ACE_RMCast_Singleton_Factory : public ACE_RMCast_Module_Factory +{ +public: + /// Constructor + /** + * The create() method will return always \param singleton. + */ + ACE_RMCast_Singleton_Factory (ACE_RMCast_Module *singleton); + + //! Destructor + virtual ~ACE_RMCast_Singleton_Factory (void); + + virtual ACE_RMCast_Module *create (void); + virtual void destroy (ACE_RMCast_Module *); + +private: + /// The singleton object + ACE_RMCast_Module *singleton_; +}; + +#if defined (__ACE_INLINE__) +#include "RMCast_Singleton_Factory.i" +#endif /* __ACE_INLINE__ */ + +#include "ace/post.h" +#endif /* ACE_RMCAST_SINGLETON_FACTORY_H */ diff --git a/protocols/ace/RMCast/RMCast_Singleton_Factory.i b/protocols/ace/RMCast/RMCast_Singleton_Factory.i new file mode 100644 index 00000000000..a45837dfa93 --- /dev/null +++ b/protocols/ace/RMCast/RMCast_Singleton_Factory.i @@ -0,0 +1,8 @@ +// $Id$ + +ACE_INLINE +ACE_RMCast_Singleton_Factory:: + ACE_RMCast_Singleton_Factory (ACE_RMCast_Module *singleton) + : singleton_ (singleton) +{ +} diff --git a/protocols/ace/RMCast/RMCast_UDP_Event_Handler.cpp b/protocols/ace/RMCast/RMCast_UDP_Event_Handler.cpp index e5ff8da2761..963928845f7 100644 --- a/protocols/ace/RMCast/RMCast_UDP_Event_Handler.cpp +++ b/protocols/ace/RMCast/RMCast_UDP_Event_Handler.cpp @@ -9,7 +9,7 @@ # include "RMCast_UDP_Event_Handler.i" #endif /* ! __ACE_INLINE__ */ -ACE_RCSID(ace, RMCast_Event_Handler, "$Id$") +ACE_RCSID(ace, RMCast_UDP_Event_Handler, "$Id$") ACE_RMCast_UDP_Event_Handler::~ACE_RMCast_UDP_Event_Handler (void) { diff --git a/protocols/ace/RMCast/RMCast_UDP_Proxy.h b/protocols/ace/RMCast/RMCast_UDP_Proxy.h index 02d4e3409a9..f1a7f77a25a 100644 --- a/protocols/ace/RMCast/RMCast_UDP_Proxy.h +++ b/protocols/ace/RMCast/RMCast_UDP_Proxy.h @@ -28,28 +28,41 @@ class ACE_RMCast_IO_UDP; +/// Define the proxy implementation for UDP based communication +/** + * Proxy objects are transport specific, they are responsible for + * storing the remote peer addressing information. + * This class implements the UDP version of a proxy. + */ class ACE_RMCast_Export ACE_RMCast_UDP_Proxy : public ACE_RMCast_Proxy { - // = TITLE - // Proxy for UDP-based RMCast - // - // = DESCRIPTION - // public: + /// Constructor + /** + * The \param io_udp argument is kept to send the replys through the + * right socket. + * The \param peer_addr is the address used byu the peer to receive + * responses. + */ ACE_RMCast_UDP_Proxy (ACE_RMCast_IO_UDP *io_udp, const ACE_INET_Addr &peer_addr); - // Constructor + /// Destructor virtual ~ACE_RMCast_UDP_Proxy (void); - // Destructor + /// Receive a message, parse and send it upstream in the right + /// format. int receive_message (char *buffer, size_t size); - // Receive the message + /// Make the peer address available const ACE_INET_Addr &peer_addr (void) const; - // The address of the peer - // = The ACE_RMCast_Proxy methods + //@{ + /** + * Implement the ACE_RMCast_Proxy methods, in this case we use the + * \param io_udp_ object to send the data, using the address of our + * remote peer. + */ virtual int reply_data (ACE_RMCast::Data &); virtual int reply_poll (ACE_RMCast::Poll &); virtual int reply_ack_join (ACE_RMCast::Ack_Join &); @@ -57,13 +70,14 @@ public: virtual int reply_ack (ACE_RMCast::Ack &); virtual int reply_join (ACE_RMCast::Join &); virtual int reply_leave (ACE_RMCast::Leave &); + //@} private: + /// The IO facade ACE_RMCast_IO_UDP *io_udp_; - // The IO facade + /// The remote peer's address ACE_INET_Addr peer_addr_; - // The address of the peer }; #if defined (__ACE_INLINE__) diff --git a/protocols/ace/RMCast/RMCast_UDP_Reliable_Receiver.cpp b/protocols/ace/RMCast/RMCast_UDP_Reliable_Receiver.cpp new file mode 100644 index 00000000000..8a99b4cee6b --- /dev/null +++ b/protocols/ace/RMCast/RMCast_UDP_Reliable_Receiver.cpp @@ -0,0 +1,31 @@ +// $Id$ + +#include "RMCast_UDP_Reliable_Receiver.h" +#include "RMCast_UDP_Event_Handler.h" + +#if !defined (__ACE_INLINE__) +# include "RMCast_UDP_Reliable_Receiver.i" +#endif /* ! __ACE_INLINE__ */ + +ACE_RCSID(ace, RMCast_UDP_Reliable_Receiver, "$Id$") + +ACE_RMCast_UDP_Reliable_Receiver::ACE_RMCast_UDP_Reliable_Receiver (ACE_RMCast_Module *user_module) + : user_factory_ (user_module) + , factory_ (&user_factory_) + , io_udp_ (&factory_) +{ +} + +ACE_RMCast_UDP_Reliable_Receiver::~ACE_RMCast_UDP_Reliable_Receiver (void) +{ +} + +void +ACE_RMCast_UDP_Reliable_Receiver::reactive_incoming_messages (ACE_Reactor *reactor) +{ + ACE_RMCast_UDP_Event_Handler *eh; + ACE_NEW (eh, ACE_RMCast_UDP_Event_Handler (&this->io_udp_)); + + /// @@ TODO Make sure it is removed from the Reactor at some point + (void) reactor->register_handler (eh, ACE_Event_Handler::READ_MASK); +} diff --git a/protocols/ace/RMCast/RMCast_UDP_Reliable_Receiver.h b/protocols/ace/RMCast/RMCast_UDP_Reliable_Receiver.h new file mode 100644 index 00000000000..2a580afbed8 --- /dev/null +++ b/protocols/ace/RMCast/RMCast_UDP_Reliable_Receiver.h @@ -0,0 +1,49 @@ +/* -*- C++ -*- */ +// $Id$ + +#ifndef ACE_RMCAST_UDP_RELIABLE_RECEIVER_H +#define ACE_RMCAST_UDP_RELIABLE_RECEIVER_H +#include "ace/pre.h" + +#include "RMCast_Singleton_Factory.h" +#include "RMCast_Reliable_Factory.h" +#include "RMCast_IO_UDP.h" + +#if !defined (ACE_LACKS_PRAGMA_ONCE) +# pragma once +#endif /* ACE_LACKS_PRAGMA_ONCE */ + +class ACE_Reactor; + +class ACE_RMCast_Export ACE_RMCast_UDP_Reliable_Receiver : public ACE_RMCast_Module +{ +public: + /// Constructor + ACE_RMCast_UDP_Reliable_Receiver (ACE_RMCast_Module *user_control); + + /// Destructor + virtual ~ACE_RMCast_UDP_Reliable_Receiver (void); + + /// Open the UDP I/O module. + int open (const ACE_INET_Addr &mcast_group); + + /// Use the reactor to handle incoming messages + void reactive_incoming_messages (ACE_Reactor *reactor); + +private: + /// All the proxys give their messages to user module + ACE_RMCast_Singleton_Factory user_factory_; + + /// This factory creates the per-proxy stack + ACE_RMCast_Reliable_Factory factory_; + + /// Handle all the UDP I/O + ACE_RMCast_IO_UDP io_udp_; +}; + +#if defined (__ACE_INLINE__) +#include "RMCast_UDP_Reliable_Receiver.i" +#endif /* __ACE_INLINE__ */ + +#include "ace/post.h" +#endif /* ACE_RMCAST_UDP_RELIABLE_RECEIVER_H */ diff --git a/protocols/ace/RMCast/RMCast_UDP_Reliable_Receiver.i b/protocols/ace/RMCast/RMCast_UDP_Reliable_Receiver.i new file mode 100644 index 00000000000..29a212adfeb --- /dev/null +++ b/protocols/ace/RMCast/RMCast_UDP_Reliable_Receiver.i @@ -0,0 +1,7 @@ +// $Id$ + +ACE_INLINE int +ACE_RMCast_UDP_Reliable_Receiver::open (const ACE_INET_Addr &mcast_group) +{ + return this->io_udp_.subscribe (mcast_group); +} diff --git a/protocols/ace/RMCast/RMCast_UDP_Reliable_Sender.cpp b/protocols/ace/RMCast/RMCast_UDP_Reliable_Sender.cpp new file mode 100644 index 00000000000..4659050ebb3 --- /dev/null +++ b/protocols/ace/RMCast/RMCast_UDP_Reliable_Sender.cpp @@ -0,0 +1,66 @@ +// $Id$ + +#include "RMCast_UDP_Reliable_Sender.h" +#include "RMCast_UDP_Event_Handler.h" +#include "RMCast_Resend_Handler.h" + +#if !defined (__ACE_INLINE__) +# include "RMCast_UDP_Reliable_Sender.i" +#endif /* ! __ACE_INLINE__ */ + +ACE_RCSID(ace, RMCast_UDP_Reliable_Sender, "$Id$") + +ACE_RMCast_UDP_Reliable_Sender::ACE_RMCast_UDP_Reliable_Sender (ACE_RMCast_Module *user_control) + : user_control_ (user_control) + + // We use a singleton factory, all proxys send their messages to the + // retransmission module. There should be only control messages + // coming back, so this is OK. + , factory_ (&retransmission_) + , io_udp_ (&factory_) +{ + // Messages are passed down to the retransmission module. + this->next (&this->retransmission_); + + // Then to the splitter, at this point control messages are sent + // back to the user, other messages continue down to the + // fragmentation layer. + this->retransmission_.next (&this->splitter_); + this->splitter_.next (&this->fragment_); + this->splitter_.control_module (this->user_control_); + + // The fragmentation layer delegates all messages to the UDP I/O + // module, that sends every message back to the application. + this->fragment_.next (&this->io_udp_); +} + +ACE_RMCast_UDP_Reliable_Sender::~ACE_RMCast_UDP_Reliable_Sender (void) +{ +} + +void +ACE_RMCast_UDP_Reliable_Sender::reactive_incoming_messages (ACE_Reactor *reactor) +{ + ACE_RMCast_UDP_Event_Handler *eh; + ACE_NEW (eh, ACE_RMCast_UDP_Event_Handler (&this->io_udp_)); + + /// @@ TODO Make sure it is removed from the Reactor at some point + (void) reactor->register_handler (eh, ACE_Event_Handler::READ_MASK); +} + +void +ACE_RMCast_UDP_Reliable_Sender::reactive_resends (ACE_Reactor *reactor, + const ACE_Time_Value &period) +{ + ACE_RMCast_Resend_Handler *eh; + ACE_NEW (eh, ACE_RMCast_Resend_Handler (&this->retransmission_)); + + /// @@ TODO make sure it is removed from the Reactor at some point + (void) reactor->schedule_timer (eh, 0, period, period); +} + +int +ACE_RMCast_UDP_Reliable_Sender::has_data (void) +{ + return this->retransmission_.has_data (); +} diff --git a/protocols/ace/RMCast/RMCast_UDP_Reliable_Sender.h b/protocols/ace/RMCast/RMCast_UDP_Reliable_Sender.h new file mode 100644 index 00000000000..3269c7cb304 --- /dev/null +++ b/protocols/ace/RMCast/RMCast_UDP_Reliable_Sender.h @@ -0,0 +1,67 @@ +/* -*- C++ -*- */ +// $Id$ + +#ifndef ACE_RMCAST_UDP_RELIABLE_SENDER_H +#define ACE_RMCAST_UDP_RELIABLE_SENDER_H +#include "ace/pre.h" + +#include "RMCast_Singleton_Factory.h" +#include "RMCast_IO_UDP.h" +#include "RMCast_Retransmission.h" +#include "RMCast_Fragment.h" +#include "RMCast_Control_Splitter.h" + +#if !defined (ACE_LACKS_PRAGMA_ONCE) +# pragma once +#endif /* ACE_LACKS_PRAGMA_ONCE */ + +class ACE_Reactor; + +class ACE_RMCast_Export ACE_RMCast_UDP_Reliable_Sender : public ACE_RMCast_Module +{ +public: + /// Constructor + ACE_RMCast_UDP_Reliable_Sender (ACE_RMCast_Module *user_control); + + /// Destructor + virtual ~ACE_RMCast_UDP_Reliable_Sender (void); + + /// Open the UDP I/O module. + int open (const ACE_INET_Addr &mcast_group); + + /// Use the reactor to handle incoming messages + void reactive_incoming_messages (ACE_Reactor *reactor); + + /// Use the reactor to periodically resend messages + void reactive_resends (ACE_Reactor *reactor, + const ACE_Time_Value &period); + + /// Check if there is still some messages to send, return 0 if not. + int has_data (void); + +private: + /// The application-level control module + ACE_RMCast_Module *user_control_; + + /// The retransmission module + ACE_RMCast_Retransmission retransmission_; + + /// All the proxys give their messages to the retransmission module + ACE_RMCast_Singleton_Factory factory_; + + /// Handle all the UDP I/O + ACE_RMCast_IO_UDP io_udp_; + + /// The fragmentation module + ACE_RMCast_Fragment fragment_; + + /// Redirect control messages to the user supplied module + ACE_RMCast_Control_Splitter splitter_; +}; + +#if defined (__ACE_INLINE__) +#include "RMCast_UDP_Reliable_Sender.i" +#endif /* __ACE_INLINE__ */ + +#include "ace/post.h" +#endif /* ACE_RMCAST_UDP_RELIABLE_SENDER_H */ diff --git a/protocols/ace/RMCast/RMCast_UDP_Reliable_Sender.i b/protocols/ace/RMCast/RMCast_UDP_Reliable_Sender.i new file mode 100644 index 00000000000..bab679aa0f2 --- /dev/null +++ b/protocols/ace/RMCast/RMCast_UDP_Reliable_Sender.i @@ -0,0 +1,7 @@ +// $Id$ + +ACE_INLINE int +ACE_RMCast_UDP_Reliable_Sender::open (const ACE_INET_Addr &mcast_group) +{ + return this->io_udp_.open (mcast_group, ACE_Addr::sap_any); +} |