diff options
Diffstat (limited to 'protocols/ace/RMCast')
33 files changed, 937 insertions, 715 deletions
diff --git a/protocols/ace/RMCast/README b/protocols/ace/RMCast/README new file mode 100644 index 00000000000..2dd0c5d9cfc --- /dev/null +++ b/protocols/ace/RMCast/README @@ -0,0 +1,57 @@ +# $Id$ + + This directory will contain a simple, small-scale reliable +multicast framework for ACE. The framework is based on the ASX +components of the ACE library: the protocol is implemented as a stack +of interchangeable "modules", each one in charge of a very small task. +For example, one module implements fragmentation and reassembly, other +modules implement retransmission, send ACK and NAK messages, and +maintain receiver membership. + + The modules are replaced to achieve different levels of +reliability. For example, the retransmission module can be either the +"Best_Effort", "Semi_Reliable" or "Reliable" implementation. In the +first case no retransmissions are performed, but lost messages are +detected and reported to the receiver. The "Semi_Reliable" case +messages are held for a pre-specified amount of time, and +re-transmited if requested, but it is possible to loose some messages +if multiple re-transmissions fail. As in the "Best_Effort" case the +lost messages are detected and flagged to the application. Finally +in the "Reliable" mode the senders are flowed controlled until enough +messages are successfully transmitted. + + In general the stack looks like this: + + +SENDER: + +---------------------------------------------------------------- +Buffering : Save lost messages +Retransmission : Retransmit +---------------------------------------------------------------- +Fragmentation : Fragment messages in smaller chunks +Reassembly : and ensure that the IOVMAX limit is not + : reached +---------------------------------------------------------------- +Tranport : Encapsulate the specific transport media + : such as TCP/IP, ATM, or shared memory + : Demuxes incoming data to the right chain + : Change control messages and data messages + : to the right dynamic types. +---------------------------------------------------------------- + +RECEIVER: + +---------------------------------------------------------------- +Lost detection : Detect lost messages and send control + : messages back +---------------------------------------------------------------- +Reassembly : Reassemble messages, fragment control +Fragmentation : data +---------------------------------------------------------------- +Transport : Group membership, ACT reception, + : handle keep-alive messages... +---------------------------------------------------------------- + + +@@ TODO: Piggybacking... diff --git a/protocols/ace/RMCast/RMCast.dsp b/protocols/ace/RMCast/RMCast.dsp index 53fdbf5cb9c..e3a84679364 100644 --- a/protocols/ace/RMCast/RMCast.dsp +++ b/protocols/ace/RMCast/RMCast.dsp @@ -94,14 +94,50 @@ LINK32=link.exe # PROP Default_Filter "cpp;c;cxx;rc;def;r;odl;idl;hpj;bat"
# Begin Source File
+SOURCE=.\RMCast.cpp
+# End Source File
+# Begin Source File
+
+SOURCE=.\RMCast_Fragment.cpp
+# End Source File
+# Begin Source File
+
+SOURCE=.\RMCast_IO_UDP.cpp
+# End Source File
+# Begin Source File
+
+SOURCE=.\RMCast_Module.cpp
+# End Source File
+# Begin Source File
+
+SOURCE=.\RMCast_Module_Factory.cpp
+# End Source File
+# Begin Source File
+
SOURCE=.\RMCast_Partial_Message.cpp
# End Source File
+# Begin Source File
+
+SOURCE=.\RMCast_Reassembly.cpp
+# End Source File
+# Begin Source File
+
+SOURCE=.\RMCast_UDP_Event_Handler.cpp
+# End Source File
+# Begin Source File
+
+SOURCE=.\RMCast_UDP_Proxy.cpp
+# End Source File
# End Group
# Begin Group "Header Files"
# PROP Default_Filter "h;hpp;hxx;hm;inl"
# Begin Source File
+SOURCE=.\RMCast.h
+# End Source File
+# Begin Source File
+
SOURCE=.\RMCast_Export.h
# End Source File
# Begin Source File
@@ -110,42 +146,76 @@ SOURCE=.\RMCast_Fragment.h # End Source File
# Begin Source File
+SOURCE=.\RMCast_IO_UDP.h
+# End Source File
+# Begin Source File
+
+SOURCE=.\RMCast_Module.h
+# End Source File
+# Begin Source File
+
+SOURCE=.\RMCast_Module_Factory.h
+# End Source File
+# Begin Source File
+
SOURCE=.\RMCast_Partial_Message.h
# End Source File
# Begin Source File
SOURCE=.\RMCast_Reassembly.h
# End Source File
+# Begin Source File
+
+SOURCE=.\RMCast_UDP_Event_Handler.h
+# End Source File
+# Begin Source File
+
+SOURCE=.\RMCast_UDP_Proxy.h
+# End Source File
# End Group
# Begin Group "Inline Files"
# PROP Default_Filter "i"
# Begin Source File
+SOURCE=.\RMCast.i
+# End Source File
+# Begin Source File
+
SOURCE=.\RMCast_Fragment.i
# End Source File
# Begin Source File
+SOURCE=.\RMCast_IO_UDP.i
+# End Source File
+# Begin Source File
+
+SOURCE=.\RMCast_Module.i
+# End Source File
+# Begin Source File
+
+SOURCE=.\RMCast_Module_Factory.i
+# End Source File
+# Begin Source File
+
SOURCE=.\RMCast_Partial_Message.i
# End Source File
# Begin Source File
SOURCE=.\RMCast_Reassembly.i
# End Source File
-# End Group
-# Begin Group "Template Files"
-
-# PROP Default_Filter ""
# Begin Source File
-SOURCE=.\RMCast_Fragment.cpp
-# PROP Exclude_From_Build 1
+SOURCE=.\RMCast_UDP_Event_Handler.i
# End Source File
# Begin Source File
-SOURCE=.\RMCast_Reassembly.cpp
-# PROP Exclude_From_Build 1
+SOURCE=.\RMCast_UDP_Proxy.i
# End Source File
# End Group
+# Begin Group "Template Files"
+
+# PROP Default_Filter ""
+# End Group
# End Target
# End Project
diff --git a/protocols/ace/RMCast/RMCast.h b/protocols/ace/RMCast/RMCast.h index 025f82a1bfb..654f391204b 100644 --- a/protocols/ace/RMCast/RMCast.h +++ b/protocols/ace/RMCast/RMCast.h @@ -62,8 +62,8 @@ public: // +---------+----------------------+ // | 32 bits | fragment_offset | // +---------+----------------------+ - // | 32 bits | payload_size | - // +---------+----------------------+ + // ? ? ? ? ? | 32 bits | payload_size | + // ? ? ? ? ? +---------+----------------------+ // | | payload | // +---------+----------------------+ // @@ -84,9 +84,9 @@ public: // +---------+----------------------+ // | 8 bits | MT_ACK | // +---------+----------------------+ - // | 32 bits | last_successful | + // | 32 bits | highest_in_sequence | // +---------+----------------------+ - // | 32 bits | last_received | + // | 32 bits | highest_received | // +---------+----------------------+ // @@ -196,26 +196,30 @@ public: ACE_Message_Block *payload; }; - struct Ack + struct Poll { - ACE_UINT32 expected; - ACE_UINT32 last_received; }; - struct Join + struct Ack_Join { + ACE_INT32 next_sequence_number; }; - struct Leave + struct Ack_Leave { }; - struct Ack_Join + struct Ack { - ACE_INT32 next_sequence_number; + ACE_UINT32 highest_in_sequence; + ACE_UINT32 highest_received; }; - struct Ack_Leave + struct Join + { + }; + + struct Leave { }; }; diff --git a/protocols/ace/RMCast/RMCast_Fragment.cpp b/protocols/ace/RMCast/RMCast_Fragment.cpp index b3baee4f972..976def7a241 100644 --- a/protocols/ace/RMCast/RMCast_Fragment.cpp +++ b/protocols/ace/RMCast/RMCast_Fragment.cpp @@ -25,7 +25,7 @@ ACE_RMCast_Fragment::~ACE_RMCast_Fragment (void) } int -ACE_RMCast_Fragment::put_data (ACE_RMCast::Data &received_data) +ACE_RMCast_Fragment::data (ACE_RMCast::Data &received_data) { if (this->next () == 0) return 0; @@ -38,7 +38,7 @@ ACE_RMCast_Fragment::put_data (ACE_RMCast::Data &received_data) // @@ We should keep the total size precomputed data.total_size = mb->total_size (); - // We must leave room for the header + // We must leave room for the header #if defined (ACE_HAS_BROKEN_DGRAM_SENDV) const int ACE_RMCAST_WRITEV_MAX = IOV_MAX - 2; #else @@ -126,7 +126,7 @@ ACE_RMCast_Fragment::put_data (ACE_RMCast::Data &received_data) + last_sent_mb_len); data.payload = blocks; - if (this->next ()->put_data (data) == -1) + if (this->next ()->data (data) == -1) return -1; // adjust the offset @@ -172,7 +172,7 @@ ACE_RMCast_Fragment::put_data (ACE_RMCast::Data &received_data) // have: if (iovcnt == ACE_RMCAST_WRITEV_MAX) { - if (this->next ()->put_data (data) == -1) + if (this->next ()->data (data) == -1) return -1; iovcnt = 0; @@ -184,5 +184,5 @@ ACE_RMCast_Fragment::put_data (ACE_RMCast::Data &received_data) if (iovcnt == 0) return 0; - return this->next ()->put_data (data); + return this->next ()->data (data); } diff --git a/protocols/ace/RMCast/RMCast_Fragment.h b/protocols/ace/RMCast/RMCast_Fragment.h index e42440b6c12..7b64d763ebc 100644 --- a/protocols/ace/RMCast/RMCast_Fragment.h +++ b/protocols/ace/RMCast/RMCast_Fragment.h @@ -40,7 +40,7 @@ public: // feedback from the lower layer (transport?) // = The ACE_RMCast_Module methods - virtual int put_data (ACE_RMCast::Data &data); + virtual int data (ACE_RMCast::Data &data); private: size_t max_fragment_size_; diff --git a/protocols/ace/RMCast/RMCast_IO_UDP.cpp b/protocols/ace/RMCast/RMCast_IO_UDP.cpp new file mode 100644 index 00000000000..89cc7ae3c3a --- /dev/null +++ b/protocols/ace/RMCast/RMCast_IO_UDP.cpp @@ -0,0 +1,418 @@ +// +// $Id$ +// + +#include "RMCast_IO_UDP.h" +#include "RMCast_UDP_Proxy.h" +#include "RMCast_Module_Factory.h" +#include "ace/Handle_Set.h" +#include "ace/Reactor.h" +#include "ace/Message_Block.h" + +#if !defined (__ACE_INLINE__) +# include "RMCast_IO_UDP.i" +#endif /* ! __ACE_INLINE__ */ + +ACE_RCSID(ace, RMCast_IO_UDP, "$Id$") + +ACE_RMCast_IO_UDP::~ACE_RMCast_IO_UDP (void) +{ +} + +int +ACE_RMCast_IO_UDP::subscribe (const ACE_INET_Addr &mcast_addr, + int reuse_addr, + const ACE_TCHAR *net_if, + int protocol_family, + int protocol) +{ + this->mcast_group_ = mcast_addr; + return this->dgram_.subscribe (mcast_addr, + reuse_addr, + net_if, + protocol_family, + protocol); +} + +int +ACE_RMCast_IO_UDP::handle_events (ACE_Time_Value *tv) +{ + ACE_HANDLE h = this->dgram_.get_handle (); + if (h == ACE_INVALID_HANDLE) + return -1; + + ACE_Handle_Set handle_set; + handle_set.set_bit (h); + + ACE_Countdown_Time countdown (tv); + + int r = ACE_OS::select (int(h) + 1, + handle_set, 0, 0, + tv); + if (r == -1) + { + if (errno == EINTR) + return 0; + else + return -1; + } + else if (r == 0) + { + return 0; + } + + return this->handle_input (h); +} + +int +ACE_RMCast_IO_UDP::register_handlers (ACE_Reactor *reactor) +{ + this->eh_.reactor (reactor); + return reactor->register_handler (&this->eh_, + ACE_Event_Handler::READ_MASK); +} + +int +ACE_RMCast_IO_UDP::remove_handlers (void) +{ + ACE_Reactor *r = this->eh_.reactor (); + if (r != 0) + { + r->remove_handler (&this->eh_, + ACE_Event_Handler::ALL_EVENTS_MASK + | ACE_Event_Handler::DONT_CALL); + this->eh_.reactor (0); + } + return 0; +} + +int +ACE_RMCast_IO_UDP::handle_input (ACE_HANDLE) +{ + // @@ We should use a system constant instead of this literal + const int max_udp_packet_size = 65536; + char buffer[max_udp_packet_size]; + + ACE_INET_Addr from_address; + ssize_t r = + this->dgram_.recv (buffer, sizeof(buffer), from_address); + + if (r == -1) + { + // @@ LOG?? + ACE_DEBUG ((LM_DEBUG, + "RMCast_IO_UDP::handle_input () - " + "error in recv\n")); + return -1; + } + + // ACE_HEX_DUMP ((LM_DEBUG, buffer, 16, "Receiver::handle_input")); + + // @@ Locking! + + int type = buffer[0]; + + if (type < 0 || type >= ACE_RMCast::MT_LAST) + { + // @@ Log: invalid message type!! + // @@ TODO: should we return -1? The socket is still valid, it + // makes little sense to destroy it just because one remote + // sender is sending invalid messages. Maybe we should + // strategize this too, and report the problem to the + // application, this could indicate a misconfiguration or + // something worse... + + // In any case the proxy should be destroyed, its peer is making + // something really wrong. + ACE_RMCast_UDP_Proxy *proxy; + if (this->map_.unbind (from_address, proxy) == 0) + { + this->factory_->destroy (proxy->module ()); + delete proxy; + } + return 0; + } + + ACE_RMCast_UDP_Proxy *proxy; + if (this->map_.find (from_address, proxy) != 0) + { + // State == RS_NON_EXISTENT + + // @@ We should validate the message *before* creating the + // object, all we need is some sort of validation strategy, a + // different one for the receiver and another one for the + // sender. +#if 0 + if (type == ACE_RMCast::MT_ACK + || type == ACE_RMCast::MT_JOIN + || type == ACE_RMCast::MT_LEAVE + || type == ACE_RMCast::MT_ACK_LEAVE) + { + // All these message types indicate a problem, the should be + // generated by receivers, not received by them. + return 0; + } +#endif /* 0 */ + + // The message type is valid, we must create a new proxy, + // initially in the JOINING state... + ACE_RMCast_Module *module = this->factory_->create (this); + if (module == 0) + { + // @@ LOG?? + // Try to continue working, maybe the module can be created + // later. + return 0; + } + ACE_NEW_RETURN (proxy, + ACE_RMCast_UDP_Proxy(this, + from_address, + module), + 0); + + if (this->map_.bind (from_address, proxy) != 0) + { + // @@ LOG?? + return 0; + } + + } + + // Have the proxy process the message and do the right thing. + return proxy->receive_message (buffer, r); +} + +ACE_HANDLE +ACE_RMCast_IO_UDP::get_handle (void) const +{ + return this->dgram_.get_handle (); +} + +int +ACE_RMCast_IO_UDP::data (ACE_RMCast::Data &data) +{ + return this->send_data (data, this->mcast_group_); +} + +int +ACE_RMCast_IO_UDP::poll (ACE_RMCast::Poll &poll) +{ + return this->send_poll (poll, this->mcast_group_); +} + +int +ACE_RMCast_IO_UDP::ack_join (ACE_RMCast::Ack_Join &ack_join) +{ + return this->send_ack_join (ack_join, this->mcast_group_); +} + +int +ACE_RMCast_IO_UDP::ack_leave (ACE_RMCast::Ack_Leave &ack_leave) +{ + return this->send_ack_leave (ack_leave, this->mcast_group_); +} + +int +ACE_RMCast_IO_UDP::ack (ACE_RMCast::Ack &ack) +{ + return this->send_ack (ack, this->mcast_group_); +} + +int +ACE_RMCast_IO_UDP::join (ACE_RMCast::Join &join) +{ + return this->send_join (join, this->mcast_group_); +} + +int +ACE_RMCast_IO_UDP::leave (ACE_RMCast::Leave &leave) +{ + return this->send_leave (leave, this->mcast_group_); +} + +int +ACE_RMCast_IO_UDP::send_data (ACE_RMCast::Data &data, + const ACE_INET_Addr &to) +{ + // The first message block contains the header + // @@ TODO: We could keep the header pre-initialized, and only + // update the portions that do change... + ACE_UINT32 tmp; + char header[1 + 3 * sizeof(ACE_UINT32)]; + header[0] = ACE_RMCast::MT_DATA; + + tmp = ACE_HTONL (data.sequence_number); + ACE_OS::memcpy (header + 1, + &tmp, sizeof(ACE_UINT32)); + tmp = ACE_HTONL (data.total_size); + ACE_OS::memcpy (header + 1 + sizeof(ACE_UINT32), + &tmp, sizeof(ACE_UINT32)); + tmp = ACE_HTONL (data.fragment_offset); + ACE_OS::memcpy (header + 1 + 2 * sizeof(ACE_UINT32), + &tmp, sizeof(ACE_UINT32)); + + iovec iov[IOV_MAX]; + int iovcnt = 1; + + iov[0].iov_base = header; + iov[0].iov_len = sizeof(header); + + ACE_Message_Block *mb = data.payload; + + for (const ACE_Message_Block *i = mb; i != 0; i = i->cont ()) + { + iov[iovcnt].iov_base = i->rd_ptr (); + iov[iovcnt].iov_len = i->length (); + iovcnt++; + if (iovcnt >= IOV_MAX) + return -1; + } + + // @@ This pacing stuff here reduced the number of packet lost in + // loopback tests, but it should be taken out for real applications + // (or at least made configurable!) + ACE_Time_Value tv (0, 10000); + ACE_OS::sleep (tv); + + // ACE_SOCK_MCast_Dgram disallows sending, but it actually works. + ACE_SOCK_Dgram &dgram = this->dgram_; + + if (dgram.send (iov, iovcnt, to) == -1) + return -1; + +#if 0 + ACE_HEX_DUMP ((LM_DEBUG, + (char*)iov[0].iov_base, iov[0].iov_len, "Sending")); +#endif + + return 0; +} + +int +ACE_RMCast_IO_UDP::send_poll (ACE_RMCast::Poll &poll, + const ACE_INET_Addr &to) +{ + // @@ TODO: We could keep the header pre-initialized, and only + // update the portions that do change... + char header[16]; + header[0] = ACE_RMCast::MT_POLL; + + // ACE_SOCK_MCast_Dgram disallows sending, but it actually works. + ACE_SOCK_Dgram &dgram = this->dgram_; + + if (dgram.send (header, 1, to) == -1) + return -1; + + return 0; +} + +int +ACE_RMCast_IO_UDP::send_ack_join (ACE_RMCast::Ack_Join &ack_join, + const ACE_INET_Addr &to) +{ + // @@ TODO: We could keep the header pre-initialized, and only + // update the portions that do change... + char header[16]; + header[0] = ACE_RMCast::MT_ACK_JOIN; + + ACE_UINT32 tmp = ACE_HTONL (ack_join.next_sequence_number); + ACE_OS::memcpy (header + 1, + &tmp, sizeof(ACE_UINT32)); + // ACE_SOCK_MCast_Dgram disallows sending, but it actually works. + ACE_SOCK_Dgram &dgram = this->dgram_; + + if (dgram.send (header, 1 + sizeof(ACE_UINT32), to) == -1) + return -1; + + return 0; +} + +int +ACE_RMCast_IO_UDP::send_ack_leave (ACE_RMCast::Ack_Leave &ack_leave, + const ACE_INET_Addr &to) +{ + // @@ TODO: We could keep the header pre-initialized, and only + // update the portions that do change... + char header[16]; + header[0] = ACE_RMCast::MT_ACK_LEAVE; + + // ACE_SOCK_MCast_Dgram disallows sending, but it actually works. + ACE_SOCK_Dgram &dgram = this->dgram_; + + if (dgram.send (header, 1, to) == -1) + return -1; + + return 0; +} + +int +ACE_RMCast_IO_UDP::send_ack (ACE_RMCast::Ack &ack, + const ACE_INET_Addr &to) +{ + // @@ TODO: We could keep the header pre-initialized, and only + // update the portions that do change... + char header[16]; + header[0] = ACE_RMCast::MT_ACK; + + ACE_UINT32 tmp = ACE_HTONL (ack.highest_in_sequence); + ACE_OS::memcpy (header + 1, + &tmp, sizeof(ACE_UINT32)); + tmp = ACE_HTONL (ack.highest_received); + ACE_OS::memcpy (header + 1 + sizeof(ACE_UINT32), + &tmp, sizeof(ACE_UINT32)); + + // ACE_SOCK_MCast_Dgram disallows sending, but it actually works. + ACE_SOCK_Dgram &dgram = this->dgram_; + + if (dgram.send (header, 1 + 2*sizeof(ACE_UINT32), to) == -1) + return -1; + + return 0; +} + +int +ACE_RMCast_IO_UDP::send_join (ACE_RMCast::Join &join, + const ACE_INET_Addr &to) +{ + // @@ TODO: We could keep the header pre-initialized, and only + // update the portions that do change... + char header[16]; + header[0] = ACE_RMCast::MT_JOIN; + + // ACE_SOCK_MCast_Dgram disallows sending, but it actually works. + ACE_SOCK_Dgram &dgram = this->dgram_; + + if (dgram.send (header, 1, to) == -1) + return -1; + + return 0; +} + +int +ACE_RMCast_IO_UDP::send_leave (ACE_RMCast::Leave &, + const ACE_INET_Addr &to) +{ + // @@ TODO: We could keep the header pre-initialized, and only + // update the portions that do change... + char header[16]; + header[0] = ACE_RMCast::MT_LEAVE; + + // ACE_SOCK_MCast_Dgram disallows sending, but it actually works. + ACE_SOCK_Dgram &dgram = this->dgram_; + + if (dgram.send (header, 1, to) == -1) + return -1; + + return 0; +} + + +#if defined (ACE_HAS_EXPLICIT_TEMPLATE_INSTANTIATION) + +template class ACE_Hash_Map_Manager<ACE_INET_Addr,ACE_RMCast_UDP_Proxy*,ACE_Null_Mutex>; +template class ACE_Hash_Map_Manager_Ex<ACE_INET_Addr,ACE_RMCast_UDP_Proxy*,ACE_Hash<ACE_INET_Addr>,ACE_Equal_To<ACE_INET_Addr>,ACE_Null_Mutex>; +template class ACE_Hash_Map_Iterator<ACE_INET_Addr,ACE_RMCast_UDP_Proxy*,ACE_Null_Mutex>; +template class ACE_Hash_Map_Iterator_Base_Ex<ACE_INET_Addr,ACE_RMCast_UDP_Proxy*,ACE_Hash<ACE_INET_Addr>,ACE_Equal_To<ACE_INET_Addr>,ACE_Null_Mutex>; +template class ACE_Hash_Map_Entry<ACE_INET_Addr,ACE_RMCast_UDP_Proxy*>; + +#endif /* ACE_HAS_EXPLICIT_TEMPLATE_INSTANTIATION */ diff --git a/protocols/ace/RMCast/RMCast_UDP_Receiver.h b/protocols/ace/RMCast/RMCast_IO_UDP.h index bfc56d89705..bdcccabe6e1 100644 --- a/protocols/ace/RMCast/RMCast_UDP_Receiver.h +++ b/protocols/ace/RMCast/RMCast_IO_UDP.h @@ -10,35 +10,36 @@ // // ============================================================================ -#ifndef ACE_RMCAST_UDP_RECEIVER_H -#define ACE_RMCAST_UDP_RECEIVER_H +#ifndef ACE_RMCAST_IO_UDP_H +#define ACE_RMCAST_IO_UDP_H #include "ace/pre.h" +#include "RMCast_Module.h" #include "RMCast_UDP_Event_Handler.h" #include "ace/SOCK_Dgram_Mcast.h" #include "ace/Hash_Map_Manager.h" #include "ace/Synch.h" +#include "ace/INET_Addr.h" #if !defined (ACE_LACKS_PRAGMA_ONCE) # pragma once #endif /* ACE_LACKS_PRAGMA_ONCE */ -class ACE_RMCast_Sender_Proxy; -class ACE_RMCast_Sender_Proxy_Factory; +class ACE_RMCast_UDP_Proxy; +class ACE_RMCast_Module_Factory; class ACE_Reactor; class ACE_Time_Value; -class ACE_INET_Addr; -class ACE_RMCast_Export ACE_RMCast_UDP_Receiver +class ACE_RMCast_Export ACE_RMCast_IO_UDP : public ACE_RMCast_Module { public: - ACE_RMCast_UDP_Receiver (ACE_RMCast_Sender_Proxy_Factory *factory); + ACE_RMCast_IO_UDP (ACE_RMCast_Module_Factory *factory); // Constructor - // <factory> is used to create the Sender_Proxy and Modules that - // process incoming messages. - // The caller owns <factory>. + // <factory> is used to create the modules for each proxy that + // process incoming messages. The class does *not* assume ownership + // of <factory>, the caller owns it. - ~ACE_RMCast_UDP_Receiver (void); + ~ACE_RMCast_IO_UDP (void); // Destructor int subscribe (const ACE_INET_Addr &mcast_addr, @@ -69,25 +70,36 @@ public: ACE_HANDLE get_handle (void) const; // Obtain the handle for the underlying socket -private: - int send_join (ACE_INET_Addr &from); - // Send a JOIN message - - int send_ack (ACE_RMCast_Sender_Proxy *sender_proxy, - ACE_INET_Addr &from); - // Send an ACK message - - int send_leave (ACE_INET_Addr &from); - // Send a LEAVE messsage + // Send back to the remove object represented by <proxy> + int send_data (ACE_RMCast::Data &, const ACE_INET_Addr &); + int send_poll (ACE_RMCast::Poll &, const ACE_INET_Addr &); + int send_ack_join (ACE_RMCast::Ack_Join &, const ACE_INET_Addr &); + int send_ack_leave (ACE_RMCast::Ack_Leave &, const ACE_INET_Addr &); + int send_ack (ACE_RMCast::Ack &, const ACE_INET_Addr &); + int send_join (ACE_RMCast::Join &, const ACE_INET_Addr &); + int send_leave (ACE_RMCast::Leave &, const ACE_INET_Addr &); + + // = The RMCast_Module methods + virtual int data (ACE_RMCast::Data &); + virtual int poll (ACE_RMCast::Poll &); + virtual int ack_join (ACE_RMCast::Ack_Join &); + virtual int ack_leave (ACE_RMCast::Ack_Leave &); + virtual int ack (ACE_RMCast::Ack &); + virtual int join (ACE_RMCast::Join &); + virtual int leave (ACE_RMCast::Leave &); + // The messages are sent to the multicast group private: - ACE_RMCast_Sender_Proxy_Factory *factory_; - // The factory used to create Sender proxies + ACE_RMCast_Module_Factory *factory_; + // The factory used to create the modules attached to each proxy + + ACE_INET_Addr mcast_group_; + // The multicast group we subscribe and send to ACE_SOCK_Dgram_Mcast dgram_; // The socket - typedef ACE_Hash_Map_Manager<ACE_INET_Addr,ACE_RMCast_Sender_Proxy*,ACE_Null_Mutex> Map; + typedef ACE_Hash_Map_Manager<ACE_INET_Addr,ACE_RMCast_UDP_Proxy*,ACE_Null_Mutex> Map; Map map_; ACE_RMCast_UDP_Event_Handler eh_; @@ -95,8 +107,8 @@ private: }; #if defined (__ACE_INLINE__) -#include "RMCast_UDP_Receiver.i" +#include "RMCast_IO_UDP.i" #endif /* __ACE_INLINE__ */ #include "ace/post.h" -#endif /* ACE_RMCAST_UDP_RECEIVER_H */ +#endif /* ACE_RMCAST_IO_UDP_H */ diff --git a/protocols/ace/RMCast/RMCast_IO_UDP.i b/protocols/ace/RMCast/RMCast_IO_UDP.i new file mode 100644 index 00000000000..ddacc5694ad --- /dev/null +++ b/protocols/ace/RMCast/RMCast_IO_UDP.i @@ -0,0 +1,9 @@ +// $Id$ + +ACE_INLINE +ACE_RMCast_IO_UDP:: + ACE_RMCast_IO_UDP (ACE_RMCast_Module_Factory *factory) + : factory_ (factory) + , eh_ (this) +{ +} diff --git a/protocols/ace/RMCast/RMCast_Module.cpp b/protocols/ace/RMCast/RMCast_Module.cpp index b47694abe4d..632d905f900 100644 --- a/protocols/ace/RMCast/RMCast_Module.cpp +++ b/protocols/ace/RMCast/RMCast_Module.cpp @@ -55,3 +55,59 @@ ACE_RMCast_Module::close (void) { return 0; } + +int +ACE_RMCast_Module::data (ACE_RMCast::Data &data) +{ + if (this->next () != 0) + return this->next ()->data (data); + return 0; +} + +int +ACE_RMCast_Module::poll (ACE_RMCast::Poll &poll) +{ + if (this->next () != 0) + return this->next ()->poll (poll); + return 0; +} + +int +ACE_RMCast_Module::ack_join (ACE_RMCast::Ack_Join &ack_join) +{ + if (this->next () != 0) + return this->next ()->ack_join (ack_join); + return 0; +} + +int +ACE_RMCast_Module::ack_leave (ACE_RMCast::Ack_Leave &ack_leave) +{ + if (this->next () != 0) + return this->next ()->ack_leave (ack_leave); + return 0; +} + +int +ACE_RMCast_Module::ack (ACE_RMCast::Ack &ack) +{ + if (this->next () != 0) + return this->next ()->ack (ack); + return 0; +} + +int +ACE_RMCast_Module::join (ACE_RMCast::Join &join) +{ + if (this->next () != 0) + return this->next ()->join (join); + return 0; +} + +int +ACE_RMCast_Module::leave (ACE_RMCast::Leave &leave) +{ + if (this->next () != 0) + return this->next ()->leave (leave); + return 0; +} diff --git a/protocols/ace/RMCast/RMCast_Module.h b/protocols/ace/RMCast/RMCast_Module.h index 30f3da2f4fe..9f83c2d5be4 100644 --- a/protocols/ace/RMCast/RMCast_Module.h +++ b/protocols/ace/RMCast/RMCast_Module.h @@ -59,7 +59,13 @@ public: virtual int close (void); // Close the module. - virtual int put_data (ACE_RMCast::Data &data) = 0; + virtual int data (ACE_RMCast::Data &); + virtual int poll (ACE_RMCast::Poll &); + virtual int ack_join (ACE_RMCast::Ack_Join &); + virtual int ack_leave (ACE_RMCast::Ack_Leave &); + virtual int ack (ACE_RMCast::Ack &); + virtual int join (ACE_RMCast::Join &); + virtual int leave (ACE_RMCast::Leave &); // Push data down the stack private: diff --git a/protocols/ace/RMCast/RMCast_Module_Factory.cpp b/protocols/ace/RMCast/RMCast_Module_Factory.cpp new file mode 100644 index 00000000000..b749048a78c --- /dev/null +++ b/protocols/ace/RMCast/RMCast_Module_Factory.cpp @@ -0,0 +1,13 @@ +// $Id$ + +#include "RMCast_Module_Factory.h" + +#if !defined (__ACE_INLINE__) +# include "RMCast_Module_Factory.i" +#endif /* ! __ACE_INLINE__ */ + +ACE_RCSID(ace, RMCast_Module_Factory, "$Id$") + +ACE_RMCast_Module_Factory::~ACE_RMCast_Module_Factory (void) +{ +} diff --git a/protocols/ace/RMCast/RMCast_Module_Factory.h b/protocols/ace/RMCast/RMCast_Module_Factory.h new file mode 100644 index 00000000000..722ad87d678 --- /dev/null +++ b/protocols/ace/RMCast/RMCast_Module_Factory.h @@ -0,0 +1,50 @@ +/* -*- C++ -*- */ +// $Id$ + +// ============================================================================ +// +// = LIBRARY +// ace +// +// = FILENAME +// RMCast_Module_Factory.h +// +// = AUTHOR +// Carlos O'Ryan <coryan@uci.edu> +// +// ============================================================================ + +#ifndef ACE_RMCAST_MODULE_FACTORY_H +#define ACE_RMCAST_MODULE_FACTORY_H +#include "ace/pre.h" + +#include "RMCast.h" + +#if !defined (ACE_LACKS_PRAGMA_ONCE) +# pragma once +#endif /* ACE_LACKS_PRAGMA_ONCE */ + +class ACE_RMCast_Module; +class ACE_RMCast_IO_UDP; + +class ACE_RMCast_Export ACE_RMCast_Module_Factory +{ + // = DESCRIPTION + // +public: + virtual ~ACE_RMCast_Module_Factory (void); + // Destructor + + virtual ACE_RMCast_Module *create (ACE_RMCast_IO_UDP *) = 0; + // Create a new proxy + + virtual void destroy (ACE_RMCast_Module *) = 0; + // Destroy a proxy +}; + +#if defined (__ACE_INLINE__) +#include "RMCast_Module_Factory.i" +#endif /* __ACE_INLINE__ */ + +#include "ace/post.h" +#endif /* ACE_RMCAST_MODULE_FACTORY_H */ diff --git a/protocols/ace/RMCast/RMCast_Sender_Proxy_Best_Effort.i b/protocols/ace/RMCast/RMCast_Module_Factory.i index cfa1da318d3..cfa1da318d3 100644 --- a/protocols/ace/RMCast/RMCast_Sender_Proxy_Best_Effort.i +++ b/protocols/ace/RMCast/RMCast_Module_Factory.i diff --git a/protocols/ace/RMCast/RMCast_Reassembly.cpp b/protocols/ace/RMCast/RMCast_Reassembly.cpp index a52791e1ebf..ba2e9b79c1a 100644 --- a/protocols/ace/RMCast/RMCast_Reassembly.cpp +++ b/protocols/ace/RMCast/RMCast_Reassembly.cpp @@ -34,7 +34,7 @@ ACE_RMCast_Reassembly::~ACE_RMCast_Reassembly (void) } int -ACE_RMCast_Reassembly::put_data (ACE_RMCast::Data &data) +ACE_RMCast_Reassembly::data (ACE_RMCast::Data &data) { if (this->next () == 0) return 0; @@ -42,7 +42,7 @@ ACE_RMCast_Reassembly::put_data (ACE_RMCast::Data &data) if (data.payload->length () + data.fragment_offset > data.total_size) { ACE_DEBUG ((LM_DEBUG, - "RMCast_Reassembly::put_data - invalid size\n")); + "RMCast_Reassembly::data - invalid size\n")); return -1; // Corrupt message? } @@ -92,7 +92,7 @@ ACE_RMCast_Reassembly::put_data (ACE_RMCast::Data &data) downstream_data.fragment_offset = 0; downstream_data.payload = message->message_body (); - int r = this->next ()->put_data (downstream_data); + int r = this->next ()->data (downstream_data); delete message; diff --git a/protocols/ace/RMCast/RMCast_Reassembly.h b/protocols/ace/RMCast/RMCast_Reassembly.h index 0982d059c7c..0bf0c3a61ee 100644 --- a/protocols/ace/RMCast/RMCast_Reassembly.h +++ b/protocols/ace/RMCast/RMCast_Reassembly.h @@ -34,7 +34,7 @@ public: // Destructor // = The ACE_RMCast_Module methods - virtual int put_data (ACE_RMCast::Data &data); + virtual int data (ACE_RMCast::Data &data); private: ACE_SYNCH_MUTEX mutex_; diff --git a/protocols/ace/RMCast/RMCast_Sender_Proxy.cpp b/protocols/ace/RMCast/RMCast_Sender_Proxy.cpp deleted file mode 100644 index ff1b7b33f15..00000000000 --- a/protocols/ace/RMCast/RMCast_Sender_Proxy.cpp +++ /dev/null @@ -1,20 +0,0 @@ -// $Id$ - -#include "RMCast_Sender_Proxy.h" -#include "RMCast_Module.h" -#include "ace/Message_Block.h" - -#if !defined (__ACE_INLINE__) -# include "RMCast_Sender_Proxy.i" -#endif /* ! __ACE_INLINE__ */ - -ACE_RCSID(ace, RMCast_Sender_Proxy, "$Id$") - -ACE_RMCast_Sender_Proxy::ACE_RMCast_Sender_Proxy (ACE_RMCast_Module *module) - : module_ (module) -{ -} - -ACE_RMCast_Sender_Proxy::~ACE_RMCast_Sender_Proxy (void) -{ -} diff --git a/protocols/ace/RMCast/RMCast_Sender_Proxy.i b/protocols/ace/RMCast/RMCast_Sender_Proxy.i deleted file mode 100644 index b47573711ea..00000000000 --- a/protocols/ace/RMCast/RMCast_Sender_Proxy.i +++ /dev/null @@ -1,7 +0,0 @@ -// $Id$ - -ACE_INLINE ACE_RMCast_Module * -ACE_RMCast_Sender_Proxy::module (void) const -{ - return this->module_; -} diff --git a/protocols/ace/RMCast/RMCast_Sender_Proxy_Best_Effort.cpp b/protocols/ace/RMCast/RMCast_Sender_Proxy_Best_Effort.cpp deleted file mode 100644 index ba525f245bc..00000000000 --- a/protocols/ace/RMCast/RMCast_Sender_Proxy_Best_Effort.cpp +++ /dev/null @@ -1,72 +0,0 @@ -// $Id$ - -#include "RMCast_Sender_Proxy_Best_Effort.h" -#include "RMCast_Module.h" -#include "ace/Message_Block.h" - -#if !defined (__ACE_INLINE__) -# include "RMCast_Sender_Proxy_Best_Effort.i" -#endif /* ! __ACE_INLINE__ */ - -ACE_RCSID(ace, RMCast_Sender_Proxy_Best_Effort, "$Id$") - -ACE_RMCast_Sender_Proxy_Best_Effort:: - ACE_RMCast_Sender_Proxy_Best_Effort (ACE_RMCast_Module *module) - : ACE_RMCast_Sender_Proxy (module) -{ -} - -ACE_RMCast_Sender_Proxy_Best_Effort:: - ~ACE_RMCast_Sender_Proxy_Best_Effort (void) -{ -} - -int -ACE_RMCast_Sender_Proxy_Best_Effort::receive_message (char *buffer, - size_t size) -{ - int type = buffer[0]; - - // All control messages are ignored... - if (type != ACE_RMCast::MT_DATA) - return 0; - - // @@ Push the event through the stack -#if 0 - ACE_DEBUG ((LM_DEBUG, - "Proxy(%x) - received data\n", long(this))); - ACE_HEX_DUMP ((LM_DEBUG, buffer, header, "Proxy")); -#endif - - const size_t header_size = 1 + 3 * sizeof(ACE_UINT32); - if (size < header_size) - { - // The message is too small - return 0; - } - - ACE_UINT32 tmp; - - ACE_RMCast::Data data; - - ACE_OS::memcpy (&tmp, buffer + 1, - sizeof(tmp)); - data.sequence_number = ACE_NTOHL (tmp); - - ACE_OS::memcpy (&tmp, buffer + 1 + sizeof(tmp), - sizeof(tmp)); - data.total_size = ACE_NTOHL (tmp); - - ACE_OS::memcpy (&tmp, buffer + 1 + 2 * sizeof(tmp), - sizeof(tmp)); - data.fragment_offset = ACE_NTOHL (tmp); - - // Pass it up the module... - ACE_Message_Block *mb; - ACE_NEW_RETURN (mb, ACE_Message_Block, -1); - mb->size (size - header_size); - mb->copy (buffer + header_size, size - header_size); - - data.payload = mb; - return this->module ()->put_data (data); -} diff --git a/protocols/ace/RMCast/RMCast_Sender_Proxy_Best_Effort.h b/protocols/ace/RMCast/RMCast_Sender_Proxy_Best_Effort.h deleted file mode 100644 index 304e026afc3..00000000000 --- a/protocols/ace/RMCast/RMCast_Sender_Proxy_Best_Effort.h +++ /dev/null @@ -1,53 +0,0 @@ -/* -*- C++ -*- */ -// $Id$ - -// ============================================================================ -// -// = LIBRARY -// ace -// -// = FILENAME -// RMCast_Sender_Proxy_Best_Effort.h -// -// = AUTHOR -// Carlos O'Ryan <coryan@uci.edu> -// -// ============================================================================ - -#ifndef ACE_RMCAST_SENDER_PROXY_BEST_EFFORT_H -#define ACE_RMCAST_SENDER_PROXY_BEST_EFFORT_H -#include "ace/pre.h" - -#include "RMCast_Sender_Proxy.h" - -#if !defined (ACE_LACKS_PRAGMA_ONCE) -# pragma once -#endif /* ACE_LACKS_PRAGMA_ONCE */ - -class ACE_RMCast_Module; - -class ACE_RMCast_Export ACE_RMCast_Sender_Proxy_Best_Effort : public ACE_RMCast_Sender_Proxy -{ - // = TITLE - // Reliable Multicast Sender Ambassador - // - // = DESCRIPTION - // Implement an Ambassador for the reliable multicast senders. - // -public: - ACE_RMCast_Sender_Proxy_Best_Effort (ACE_RMCast_Module *module); - // Constructor - - ~ACE_RMCast_Sender_Proxy_Best_Effort (void); - // Destructor - - virtual int receive_message (char *buffer, size_t size); - // A DATA message was received. -}; - -#if defined (__ACE_INLINE__) -#include "RMCast_Sender_Proxy_Best_Effort.i" -#endif /* __ACE_INLINE__ */ - -#include "ace/post.h" -#endif /* ACE_RMCAST_SENDER_PROXY_BEST_EFFORT_H */ diff --git a/protocols/ace/RMCast/RMCast_Sender_Proxy_Factory.cpp b/protocols/ace/RMCast/RMCast_Sender_Proxy_Factory.cpp deleted file mode 100644 index 48a82b5dfbc..00000000000 --- a/protocols/ace/RMCast/RMCast_Sender_Proxy_Factory.cpp +++ /dev/null @@ -1,13 +0,0 @@ -// $Id$ - -#include "RMCast_Sender_Proxy_Factory.h" - -#if !defined (__ACE_INLINE__) -# include "RMCast_Sender_Proxy_Factory.i" -#endif /* ! __ACE_INLINE__ */ - -ACE_RCSID(ace, RMCast_Sender_Proxy_Factory, "$Id$") - -ACE_RMCast_Sender_Proxy_Factory::~ACE_RMCast_Sender_Proxy_Factory (void) -{ -} diff --git a/protocols/ace/RMCast/RMCast_Sender_Proxy_Factory.h b/protocols/ace/RMCast/RMCast_Sender_Proxy_Factory.h deleted file mode 100644 index 7dff4d2796f..00000000000 --- a/protocols/ace/RMCast/RMCast_Sender_Proxy_Factory.h +++ /dev/null @@ -1,55 +0,0 @@ -/* -*- C++ -*- */ -// $Id$ - -// ============================================================================ -// -// = LIBRARY -// ace -// -// = FILENAME -// RMCast_Sender_Proxy_Factory.h -// -// = AUTHOR -// Carlos O'Ryan <coryan@uci.edu> -// -// ============================================================================ - -#ifndef ACE_RMCAST_SENDER_PROXY_FACTORY_H -#define ACE_RMCAST_SENDER_PROXY_FACTORY_H -#include "ace/pre.h" - -#include "RMCast.h" - -#if !defined (ACE_LACKS_PRAGMA_ONCE) -# pragma once -#endif /* ACE_LACKS_PRAGMA_ONCE */ - -class ACE_RMCast_Sender_Proxy; - -class ACE_RMCast_Export ACE_RMCast_Sender_Proxy_Factory -{ - // = DESCRIPTION - // Defines the interface to create Sender_Proxies. - // The application provides a Sender_Proxy_Factory, this is used - // by the receiver side to create a different proxy for each - // remote sender. The application configures the proxy with the - // correct modules to process incoming events and achieve the - // desired level of reliability. - // -public: - virtual ~ACE_RMCast_Sender_Proxy_Factory (void); - // Destructor - - virtual ACE_RMCast_Sender_Proxy *create (void) = 0; - // Create a new proxy - - virtual void destroy (ACE_RMCast_Sender_Proxy *) = 0; - // Destroy a proxy -}; - -#if defined (__ACE_INLINE__) -#include "RMCast_Sender_Proxy_Factory.i" -#endif /* __ACE_INLINE__ */ - -#include "ace/post.h" -#endif /* ACE_RMCAST_SENDER_PROXY_FACTORY_H */ diff --git a/protocols/ace/RMCast/RMCast_Sender_Proxy_Factory.i b/protocols/ace/RMCast/RMCast_Sender_Proxy_Factory.i deleted file mode 100644 index cfa1da318d3..00000000000 --- a/protocols/ace/RMCast/RMCast_Sender_Proxy_Factory.i +++ /dev/null @@ -1 +0,0 @@ -// $Id$ diff --git a/protocols/ace/RMCast/RMCast_UDP_Event_Handler.cpp b/protocols/ace/RMCast/RMCast_UDP_Event_Handler.cpp index 69cfc337113..e5ff8da2761 100644 --- a/protocols/ace/RMCast/RMCast_UDP_Event_Handler.cpp +++ b/protocols/ace/RMCast/RMCast_UDP_Event_Handler.cpp @@ -3,7 +3,7 @@ // #include "RMCast_UDP_Event_Handler.h" -#include "RMCast_UDP_Receiver.h" +#include "RMCast_IO_UDP.h" #if !defined (__ACE_INLINE__) # include "RMCast_UDP_Event_Handler.i" @@ -18,19 +18,19 @@ ACE_RMCast_UDP_Event_Handler::~ACE_RMCast_UDP_Event_Handler (void) ACE_HANDLE ACE_RMCast_UDP_Event_Handler::get_handle (void) const { - return this->receiver_->get_handle (); + return this->io_udp_->get_handle (); } int ACE_RMCast_UDP_Event_Handler::handle_input (ACE_HANDLE h) { - return this->receiver_->handle_input (h); + return this->io_udp_->handle_input (h); } int ACE_RMCast_UDP_Event_Handler::handle_timeout (const ACE_Time_Value &, const void *) { - // @@ return this->receiver_->handle_timeout (); + // @@ return this->io_udp_->handle_timeout (); return 0; } diff --git a/protocols/ace/RMCast/RMCast_UDP_Event_Handler.h b/protocols/ace/RMCast/RMCast_UDP_Event_Handler.h index 193d7038cd8..02798cee7f8 100644 --- a/protocols/ace/RMCast/RMCast_UDP_Event_Handler.h +++ b/protocols/ace/RMCast/RMCast_UDP_Event_Handler.h @@ -4,7 +4,7 @@ // // = DESCRIPTION // Implement an adapter between the ACE Reactor and the -// ACE_RMCast_UDP_Receiver +// ACE_RMCast_IO_UDP // // = AUTHOR // Carlos O'Ryan <coryan@uci.edu> @@ -22,13 +22,13 @@ # pragma once #endif /* ACE_LACKS_PRAGMA_ONCE */ -class ACE_RMCast_UDP_Receiver; +class ACE_RMCast_IO_UDP; class ACE_INET_Addr; class ACE_RMCast_Export ACE_RMCast_UDP_Event_Handler : public ACE_Event_Handler { public: - ACE_RMCast_UDP_Event_Handler (ACE_RMCast_UDP_Receiver *receiver); + ACE_RMCast_UDP_Event_Handler (ACE_RMCast_IO_UDP *receiver); // Constructor ~ACE_RMCast_UDP_Event_Handler (void); @@ -41,7 +41,7 @@ public: const void *act = 0); private: - ACE_RMCast_UDP_Receiver *receiver_; + ACE_RMCast_IO_UDP *io_udp_; // The sender }; diff --git a/protocols/ace/RMCast/RMCast_UDP_Event_Handler.i b/protocols/ace/RMCast/RMCast_UDP_Event_Handler.i index b35aeefa3f4..99b4c0ac7e5 100644 --- a/protocols/ace/RMCast/RMCast_UDP_Event_Handler.i +++ b/protocols/ace/RMCast/RMCast_UDP_Event_Handler.i @@ -2,8 +2,7 @@ ACE_INLINE ACE_RMCast_UDP_Event_Handler:: -ACE_RMCast_UDP_Event_Handler (ACE_RMCast_UDP_Receiver *receiver) - : receiver_ (receiver) +ACE_RMCast_UDP_Event_Handler (ACE_RMCast_IO_UDP *io) + : io_udp_ (io) { } - diff --git a/protocols/ace/RMCast/RMCast_UDP_Proxy.cpp b/protocols/ace/RMCast/RMCast_UDP_Proxy.cpp new file mode 100644 index 00000000000..1fbad27f2cd --- /dev/null +++ b/protocols/ace/RMCast/RMCast_UDP_Proxy.cpp @@ -0,0 +1,136 @@ +// $Id$ + +#include "RMCast_UDP_Proxy.h" +#include "RMCast_Module.h" +#include "ace/Message_Block.h" + +#if !defined (__ACE_INLINE__) +# include "RMCast_UDP_Proxy.i" +#endif /* ! __ACE_INLINE__ */ + +ACE_RCSID(ace, RMCast_UDP_Proxy, "$Id$") + +ACE_RMCast_UDP_Proxy::ACE_RMCast_UDP_Proxy (ACE_RMCast_IO_UDP *io_udp, + const ACE_INET_Addr &addr, + ACE_RMCast_Module *module) + : io_udp_ (io_udp) + , peer_addr_ (addr) + , module_ (module) +{ +} + +ACE_RMCast_UDP_Proxy::~ACE_RMCast_UDP_Proxy (void) +{ +} + +int +ACE_RMCast_UDP_Proxy::receive_message (char *buffer, size_t size) +{ + int type = buffer[0]; + + // @@ What should we do with invalid messages like this? + // + if (type < 0 || type >= ACE_RMCast::MT_LAST) + return 0; + + if (type == ACE_RMCast::MT_POLL) + { + ACE_RMCast::Poll poll; + return this->module ()->poll (poll); + } + + else if (type == ACE_RMCast::MT_ACK_JOIN) + { + ACE_RMCast::Ack_Join ack_join; + const size_t header_size = 1 + sizeof(ACE_UINT32); + if (size < header_size) + { + // The message is too small + return 0; + } + + ACE_UINT32 tmp; + + ACE_OS::memcpy (&tmp, buffer + 1, + sizeof(tmp)); + ack_join.next_sequence_number = ACE_NTOHL (tmp); + return this->module ()->ack_join (ack_join); + } + + else if (type == ACE_RMCast::MT_ACK_LEAVE) + { + ACE_RMCast::Ack_Leave ack_leave; + return this->module ()->ack_leave (ack_leave); + } + + else if (type == ACE_RMCast::MT_DATA) + { + ACE_RMCast::Data data; + const size_t header_size = 1 + 3 * sizeof(ACE_UINT32); + if (size < header_size) + { + // The message is too small + return 0; + } + + ACE_UINT32 tmp; + + ACE_OS::memcpy (&tmp, buffer + 1, + sizeof(tmp)); + data.sequence_number = ACE_NTOHL (tmp); + + ACE_OS::memcpy (&tmp, buffer + 1 + sizeof(tmp), + sizeof(tmp)); + data.total_size = ACE_NTOHL (tmp); + + ACE_OS::memcpy (&tmp, buffer + 1 + 2 * sizeof(tmp), + sizeof(tmp)); + data.fragment_offset = ACE_NTOHL (tmp); + + // Pass it up the module... + ACE_Message_Block *mb; + ACE_NEW_RETURN (mb, ACE_Message_Block, -1); + mb->size (size - header_size); + mb->copy (buffer + header_size, size - header_size); + + data.payload = mb; + return this->module ()->data (data); + } + + else if (type == ACE_RMCast::MT_JOIN) + { + ACE_RMCast::Join join; + return this->module ()->join (join); + } + + else if (type == ACE_RMCast::MT_LEAVE) + { + ACE_RMCast::Leave leave; + return this->module ()->leave (leave); + } + + else if (type == ACE_RMCast::MT_ACK) + { + ACE_RMCast::Ack ack; + + const size_t header_size = 1 + sizeof(ACE_UINT32); + if (size < header_size) + { + // The message is too small + return 0; + } + + ACE_UINT32 tmp; + + ACE_OS::memcpy (&tmp, buffer + 1, + sizeof(tmp)); + ack.highest_in_sequence = ACE_NTOHL (tmp); + ACE_OS::memcpy (&tmp, buffer + 1 + sizeof(ACE_UINT32), + sizeof(tmp)); + ack.highest_received = ACE_NTOHL (tmp); + + return this->module ()->ack (ack); + } + + return 0; +} diff --git a/protocols/ace/RMCast/RMCast_Sender_Proxy.h b/protocols/ace/RMCast/RMCast_UDP_Proxy.h index c6b51f78b48..aa7e08c65be 100644 --- a/protocols/ace/RMCast/RMCast_Sender_Proxy.h +++ b/protocols/ace/RMCast/RMCast_UDP_Proxy.h @@ -7,26 +7,28 @@ // ace // // = FILENAME -// RMCast_Sender_Proxy.h +// RMCast_UDP_Proxy.h // // = AUTHOR // Carlos O'Ryan <coryan@uci.edu> // // ============================================================================ -#ifndef ACE_RMCAST_SENDER_PROXY_H -#define ACE_RMCAST_SENDER_PROXY_H +#ifndef ACE_RMCAST_UDP_PROXY_H +#define ACE_RMCAST_UDP_PROXY_H #include "ace/pre.h" #include "RMCast.h" +#include "ace/INET_Addr.h" #if !defined (ACE_LACKS_PRAGMA_ONCE) # pragma once #endif /* ACE_LACKS_PRAGMA_ONCE */ class ACE_RMCast_Module; +class ACE_RMCast_IO_UDP; -class ACE_RMCast_Export ACE_RMCast_Sender_Proxy +class ACE_RMCast_Export ACE_RMCast_UDP_Proxy { // = TITLE // Reliable Multicast Sender Ambassador @@ -35,27 +37,37 @@ class ACE_RMCast_Export ACE_RMCast_Sender_Proxy // Implement an Ambassador for the reliable multicast senders. // public: - ACE_RMCast_Sender_Proxy (ACE_RMCast_Module *module); + ACE_RMCast_UDP_Proxy (ACE_RMCast_IO_UDP *io_udp, + const ACE_INET_Addr &peer_addr, + ACE_RMCast_Module *module); // Constructor - virtual ~ACE_RMCast_Sender_Proxy (void); + virtual ~ACE_RMCast_UDP_Proxy (void); // Destructor - ACE_RMCast_Module *module (void) const; - // Return the internal module + int receive_message (char *buffer, size_t size); + // Receive the message + + const ACE_INET_Addr &peer_addr (void) const; + // The address of the peer - virtual int receive_message (char *buffer, size_t size) = 0; - // A new message has been received, process it + ACE_RMCast_Module *module (void) const; + // The module private: + ACE_RMCast_IO_UDP *io_udp_; + // The IO facade + + ACE_INET_Addr peer_addr_; + // The address of the peer + ACE_RMCast_Module *module_; - // Process the data, control messages are processed by the Sender - // proxy + // Process the data and control messages. }; #if defined (__ACE_INLINE__) -#include "RMCast_Sender_Proxy.i" +#include "RMCast_UDP_Proxy.i" #endif /* __ACE_INLINE__ */ #include "ace/post.h" -#endif /* ACE_RMCAST_SENDER_PROXY_H */ +#endif /* ACE_RMCAST_UDP_PROXY_H */ diff --git a/protocols/ace/RMCast/RMCast_UDP_Proxy.i b/protocols/ace/RMCast/RMCast_UDP_Proxy.i new file mode 100644 index 00000000000..8ef6142ed7c --- /dev/null +++ b/protocols/ace/RMCast/RMCast_UDP_Proxy.i @@ -0,0 +1,13 @@ +// $Id$ + +ACE_INLINE const ACE_INET_Addr& +ACE_RMCast_UDP_Proxy::peer_addr (void) const +{ + return this->peer_addr_; +} + +ACE_INLINE ACE_RMCast_Module * +ACE_RMCast_UDP_Proxy::module (void) const +{ + return this->module_; +} diff --git a/protocols/ace/RMCast/RMCast_UDP_Receiver.cpp b/protocols/ace/RMCast/RMCast_UDP_Receiver.cpp deleted file mode 100644 index eeb03f50bcf..00000000000 --- a/protocols/ace/RMCast/RMCast_UDP_Receiver.cpp +++ /dev/null @@ -1,241 +0,0 @@ -// -// $Id$ -// - -#include "RMCast_UDP_Receiver.h" -#include "RMCast_Sender_Proxy.h" -#include "RMCast_Sender_Proxy_Factory.h" -#include "ace/Handle_Set.h" -#include "ace/Reactor.h" - -#if !defined (__ACE_INLINE__) -# include "RMCast_UDP_Receiver.i" -#endif /* ! __ACE_INLINE__ */ - -ACE_RCSID(ace, RMCast_Receiver, "$Id$") - -ACE_RMCast_UDP_Receiver::~ACE_RMCast_UDP_Receiver (void) -{ -} - -int -ACE_RMCast_UDP_Receiver::subscribe (const ACE_INET_Addr &mcast_addr, - int reuse_addr, - const ACE_TCHAR *net_if, - int protocol_family, - int protocol) -{ - return this->dgram_.subscribe (mcast_addr, - reuse_addr, - net_if, - protocol_family, - protocol); -} - -int -ACE_RMCast_UDP_Receiver::handle_events (ACE_Time_Value *tv) -{ - ACE_HANDLE h = this->dgram_.get_handle (); - if (h == ACE_INVALID_HANDLE) - return -1; - - ACE_Handle_Set handle_set; - handle_set.set_bit (h); - - ACE_Countdown_Time countdown (tv); - - int r = ACE_OS::select (int(h) + 1, - handle_set, 0, 0, - tv); - if (r == -1) - { - if (errno == EINTR) - return 0; - else - return -1; - } - else if (r == 0) - { - return 0; - } - - return this->handle_input (h); -} - -int -ACE_RMCast_UDP_Receiver::register_handlers (ACE_Reactor *reactor) -{ - this->eh_.reactor (reactor); - return reactor->register_handler (&this->eh_, - ACE_Event_Handler::READ_MASK); -} - -int -ACE_RMCast_UDP_Receiver::remove_handlers (void) -{ - ACE_Reactor *r = this->eh_.reactor (); - if (r != 0) - { - r->remove_handler (&this->eh_, - ACE_Event_Handler::ALL_EVENTS_MASK - | ACE_Event_Handler::DONT_CALL); - this->eh_.reactor (0); - } - return 0; -} - -int -ACE_RMCast_UDP_Receiver::handle_input (ACE_HANDLE) -{ - // @@ We should use a system constant instead of this literal - const int max_udp_packet_size = 65536; - char buffer[max_udp_packet_size]; - - ACE_INET_Addr from_address; - ssize_t r = - this->dgram_.recv (buffer, sizeof(buffer), from_address); - - if (r == -1) - { - // @@ LOG?? - ACE_DEBUG ((LM_DEBUG, - "RMCast_UDP_Receiver::handle_input () - " - "error in recv\n")); - return -1; - } - - // ACE_HEX_DUMP ((LM_DEBUG, buffer, 16, "Receiver::handle_input")); - - // @@ Locking! - - int type = buffer[0]; - - ACE_RMCast_Sender_Proxy *sender_proxy; - if (this->map_.find (from_address, sender_proxy) != 0) - { - // State == RS_NON_EXISTENT - - if (type == ACE_RMCast::MT_ACK - || type == ACE_RMCast::MT_JOIN - || type == ACE_RMCast::MT_LEAVE - || type == ACE_RMCast::MT_ACK_LEAVE) - { - // All these message types indicate a problem, the should be - // generated by receivers, not received by them. - return 0; - } - - // The message type is valid, we must create a new proxy, - // initially in the JOINING state... - sender_proxy = - this->factory_->create (); - if (sender_proxy == 0) - { - // @@ LOG?? - return 0; - } - if (this->map_.bind (from_address, sender_proxy) != 0) - { - // @@ LOG?? - return 0; - } - - // Send back a JOIN message... - return sender_proxy->receive_message (buffer, r); - } - - if (type == ACE_RMCast::MT_ACK - || type == ACE_RMCast::MT_JOIN - || type == ACE_RMCast::MT_LEAVE - || type == ACE_RMCast::MT_ACK_LEAVE - || type < 0 - || type >= ACE_RMCast::MT_LAST) - { - // In this case the message is invalid, but the proxy is already - // in the table, must destroy it because there was a violation - // in the protocol.... - - this->factory_->destroy (sender_proxy); - this->map_.unbind (from_address); - return 0; - } - - return sender_proxy->receive_message (buffer, r); -} - -ACE_HANDLE -ACE_RMCast_UDP_Receiver::get_handle (void) const -{ - return this->dgram_.get_handle (); -} - -#if 0 -int -ACE_RMCast_UDP_Receiver::send_join (ACE_INET_Addr &from) -{ - char buffer[16]; - buffer[0] = ACE_RMCast::MT_JOIN; - - ACE_SOCK_Dgram &dgram = this->dgram_; - ssize_t r = dgram.send (buffer, 1, from); - - if (r == -1) - return -1; - - return 0; -} - -int -ACE_RMCast_UDP_Receiver::send_ack (ACE_RMCast_Sender_Proxy *sender_proxy, - ACE_INET_Addr &from) -{ - char buffer[16]; - buffer[0] = ACE_RMCast::MT_ACK; - - ACE_UINT32 expected = sender_proxy->expected (); - expected = ACE_HTONL (expected); - - ACE_UINT32 last_received = sender_proxy->last_received (); - last_received = ACE_HTONL (last_received); - - ACE_OS::memcpy (buffer + 1, &expected, sizeof(expected)); - ACE_OS::memcpy (buffer + 1 + sizeof(expected), &last_received, - sizeof(last_received)); - - ACE_SOCK_Dgram &dgram = this->dgram_; - ssize_t r = dgram.send (buffer, - + sizeof(expected) - + sizeof(last_received), - from); - - if (r == -1) - return -1; - - return 0; -} - -int -ACE_RMCast_UDP_Receiver::send_leave (ACE_INET_Addr &from) -{ - char buffer[16]; - buffer[0] = ACE_RMCast::MT_LEAVE; - - ACE_SOCK_Dgram &dgram = this->dgram_; - ssize_t r = dgram.send (buffer, 1, from); - - if (r == -1) - return -1; - - return 0; -} -#endif /* 0 */ - -#if defined (ACE_HAS_EXPLICIT_TEMPLATE_INSTANTIATION) - -template class ACE_Hash_Map_Manager<ACE_INET_Addr,ACE_RMCast_Sender_Proxy*,ACE_Null_Mutex>; -template class ACE_Hash_Map_Manager_Ex<ACE_INET_Addr,ACE_RMCast_Sender_Proxy*,ACE_Hash<ACE_INET_Addr>,ACE_Equal_To<ACE_INET_Addr>,ACE_Null_Mutex>; -template class ACE_Hash_Map_Iterator<ACE_INET_Addr,ACE_RMCast_Sender_Proxy*,ACE_Null_Mutex>; -template class ACE_Hash_Map_Iterator_Base_Ex<ACE_INET_Addr,ACE_RMCast_Sender_Proxy*,ACE_Hash<ACE_INET_Addr>,ACE_Equal_To<ACE_INET_Addr>,ACE_Null_Mutex>; -template class ACE_Hash_Map_Entry<ACE_INET_Addr,ACE_RMCast_Sender_Proxy*>; - -#endif /* ACE_HAS_EXPLICIT_TEMPLATE_INSTANTIATION */ diff --git a/protocols/ace/RMCast/RMCast_UDP_Receiver.i b/protocols/ace/RMCast/RMCast_UDP_Receiver.i deleted file mode 100644 index 81aeb8e2752..00000000000 --- a/protocols/ace/RMCast/RMCast_UDP_Receiver.i +++ /dev/null @@ -1,9 +0,0 @@ -// $Id$ - -ACE_INLINE -ACE_RMCast_UDP_Receiver:: - ACE_RMCast_UDP_Receiver (ACE_RMCast_Sender_Proxy_Factory *factory) - : factory_ (factory) - , eh_ (this) -{ -} diff --git a/protocols/ace/RMCast/RMCast_UDP_Sender.cpp b/protocols/ace/RMCast/RMCast_UDP_Sender.cpp deleted file mode 100644 index c02ad6fb9cf..00000000000 --- a/protocols/ace/RMCast/RMCast_UDP_Sender.cpp +++ /dev/null @@ -1,91 +0,0 @@ -// -// $Id$ -// - -#include "RMCast_UDP_Sender.h" -#include "RMCast.h" -#include "ace/Message_Block.h" - -#if !defined (__ACE_INLINE__) -# include "RMCast_UDP_Sender.i" -#endif /* ! __ACE_INLINE__ */ - -ACE_RCSID(ace, RMCast_UDP_Sender, "$Id$") - -ACE_RMCast_UDP_Sender::ACE_RMCast_UDP_Sender (const ACE_INET_Addr &mcast_addr) - : ACE_RMCast_Module () - , mcast_addr_ (mcast_addr) -{ -} - -ACE_RMCast_UDP_Sender::~ACE_RMCast_UDP_Sender (void) -{ -} - -int -ACE_RMCast_UDP_Sender::open (void) -{ - return this->dgram_.open (ACE_Addr::sap_any); -} - -int -ACE_RMCast_UDP_Sender::close (void) -{ - return this->dgram_.close (); -} - -int -ACE_RMCast_UDP_Sender::put_data (ACE_RMCast::Data &data) -{ - // The first message block contains the header - // @@ TODO: We could keep the header pre-initialized, and only - // update the portions that do change... - ACE_UINT32 tmp; - char header[1 + 3 * sizeof(ACE_UINT32)]; - header[0] = ACE_RMCast::MT_DATA; - - tmp = ACE_HTONL (data.sequence_number); - ACE_OS::memcpy (header + 1, - &tmp, sizeof(ACE_UINT32)); - tmp = ACE_HTONL (data.total_size); - ACE_OS::memcpy (header + 1 + sizeof(ACE_UINT32), - &tmp, sizeof(ACE_UINT32)); - tmp = ACE_HTONL (data.fragment_offset); - ACE_OS::memcpy (header + 1 + 2 * sizeof(ACE_UINT32), - &tmp, sizeof(ACE_UINT32)); - - iovec iov[IOV_MAX]; - int iovcnt = 1; - - iov[0].iov_base = header; - iov[0].iov_len = sizeof(header); - - ACE_Message_Block *mb = data.payload; - - for (const ACE_Message_Block *i = mb; i != 0; i = i->cont ()) - { - iov[iovcnt].iov_base = i->rd_ptr (); - iov[iovcnt].iov_len = i->length (); - iovcnt++; - if (iovcnt >= IOV_MAX) - return -1; - } - - ACE_Time_Value tv (0, 10000); - ACE_OS::sleep (tv); - if (this->dgram_.send (iov, iovcnt, - this->mcast_addr_) == -1) - return -1; - -#if 0 - ACE_HEX_DUMP ((LM_DEBUG, - (char*)iov[0].iov_base, iov[0].iov_len, "Sending")); -#endif - - return 0; -} - - -#if defined (ACE_HAS_EXPLICIT_TEMPLATE_INSTANTIATION) - -#endif /* ACE_HAS_EXPLICIT_TEMPLATE_INSTANTIATION */ diff --git a/protocols/ace/RMCast/RMCast_UDP_Sender.h b/protocols/ace/RMCast/RMCast_UDP_Sender.h deleted file mode 100644 index 474ebbc7f27..00000000000 --- a/protocols/ace/RMCast/RMCast_UDP_Sender.h +++ /dev/null @@ -1,70 +0,0 @@ -/* -*- C++ -*- */ -// $Id$ - -// ============================================================================ -// -// = LIBRARY -// ace -// -// = FILENAME -// RMCast_UDP_Sender.h -// -// = AUTHOR -// Carlos O'Ryan <coryan@uci.edu> -// -// ============================================================================ - -#ifndef ACE_RMCAST_UDP_SENDER_H -#define ACE_RMCAST_UDP_SENDER_H -#include "ace/pre.h" - -#include "RMCast_Module.h" -#include "ace/SOCK_Dgram.h" - -#if !defined (ACE_LACKS_PRAGMA_ONCE) -# pragma once -#endif /* ACE_LACKS_PRAGMA_ONCE */ - -class ACE_RMCast_Export ACE_RMCast_UDP_Sender : public ACE_RMCast_Module -{ - // = TITLE - // Reliable Multicast UDP_Sender - // - // = DESCRIPTION - // Implements a Facade to the classes that implement a reliable - // multicast protocol. -public: - // = Initialization and termination methods. - ACE_RMCast_UDP_Sender (const ACE_INET_Addr &mcast_addr); - // Constructor - - virtual ~ACE_RMCast_UDP_Sender (void); - // Destructor - - // = The RMCast_Module methods - virtual int open (void); - virtual int close (void); - virtual int put_data (ACE_RMCast::Data &data); - // Send the Message block, this is the callback invoked at the end - // of the stack. - -protected: - ACE_SOCK_Dgram dgram_; - // This is the socket used to send the multicast data. - // @@ This should be strategized, what if we want to use something - // like ATM networks to send the data, then the types would be - // different.... - - ACE_INET_Addr mcast_addr_; - // The multicast group we send to. - // @@ Can we really strategize the addressing, without introducing - // too much complexity? How can we decouple the reliability aspect - // from the transport aspects of the system??? -}; - -#if defined (__ACE_INLINE__) -#include "RMCast_UDP_Sender.i" -#endif /* __ACE_INLINE__ */ - -#include "ace/post.h" -#endif /* ACE_RMCAST_UDP_SENDER_H */ diff --git a/protocols/ace/RMCast/RMCast_UDP_Sender.i b/protocols/ace/RMCast/RMCast_UDP_Sender.i deleted file mode 100644 index cfa1da318d3..00000000000 --- a/protocols/ace/RMCast/RMCast_UDP_Sender.i +++ /dev/null @@ -1 +0,0 @@ -// $Id$ |