summaryrefslogtreecommitdiff
path: root/protocols
diff options
context:
space:
mode:
authorcoryan <coryan@ae88bc3d-4319-0410-8dbf-d08b4c9d3795>2000-08-21 16:09:37 +0000
committercoryan <coryan@ae88bc3d-4319-0410-8dbf-d08b4c9d3795>2000-08-21 16:09:37 +0000
commit71638b50fdc367c2d98e05eccd290058fdb2cc9f (patch)
treeb9bc524019091a267284dd35b7eaa23c4971d4ce /protocols
parentd5705c3b52b6c14d046bcae31639f02b32900152 (diff)
downloadATCD-71638b50fdc367c2d98e05eccd290058fdb2cc9f.tar.gz
ChangeLogTag:Mon Aug 21 08:58:19 2000 Carlos O'Ryan <coryan@uci.edu>
Diffstat (limited to 'protocols')
-rw-r--r--protocols/ace/RMCast/README57
-rw-r--r--protocols/ace/RMCast/RMCast.dsp86
-rw-r--r--protocols/ace/RMCast/RMCast.h28
-rw-r--r--protocols/ace/RMCast/RMCast_Fragment.cpp10
-rw-r--r--protocols/ace/RMCast/RMCast_Fragment.h2
-rw-r--r--protocols/ace/RMCast/RMCast_IO_UDP.cpp418
-rw-r--r--protocols/ace/RMCast/RMCast_IO_UDP.h (renamed from protocols/ace/RMCast/RMCast_UDP_Receiver.h)64
-rw-r--r--protocols/ace/RMCast/RMCast_IO_UDP.i9
-rw-r--r--protocols/ace/RMCast/RMCast_Module.cpp56
-rw-r--r--protocols/ace/RMCast/RMCast_Module.h8
-rw-r--r--protocols/ace/RMCast/RMCast_Module_Factory.cpp13
-rw-r--r--protocols/ace/RMCast/RMCast_Module_Factory.h50
-rw-r--r--protocols/ace/RMCast/RMCast_Module_Factory.i (renamed from protocols/ace/RMCast/RMCast_Sender_Proxy_Best_Effort.i)0
-rw-r--r--protocols/ace/RMCast/RMCast_Reassembly.cpp6
-rw-r--r--protocols/ace/RMCast/RMCast_Reassembly.h2
-rw-r--r--protocols/ace/RMCast/RMCast_Sender_Proxy.cpp20
-rw-r--r--protocols/ace/RMCast/RMCast_Sender_Proxy.i7
-rw-r--r--protocols/ace/RMCast/RMCast_Sender_Proxy_Best_Effort.cpp72
-rw-r--r--protocols/ace/RMCast/RMCast_Sender_Proxy_Best_Effort.h53
-rw-r--r--protocols/ace/RMCast/RMCast_Sender_Proxy_Factory.cpp13
-rw-r--r--protocols/ace/RMCast/RMCast_Sender_Proxy_Factory.h55
-rw-r--r--protocols/ace/RMCast/RMCast_Sender_Proxy_Factory.i1
-rw-r--r--protocols/ace/RMCast/RMCast_UDP_Event_Handler.cpp8
-rw-r--r--protocols/ace/RMCast/RMCast_UDP_Event_Handler.h8
-rw-r--r--protocols/ace/RMCast/RMCast_UDP_Event_Handler.i5
-rw-r--r--protocols/ace/RMCast/RMCast_UDP_Proxy.cpp136
-rw-r--r--protocols/ace/RMCast/RMCast_UDP_Proxy.h (renamed from protocols/ace/RMCast/RMCast_Sender_Proxy.h)40
-rw-r--r--protocols/ace/RMCast/RMCast_UDP_Proxy.i13
-rw-r--r--protocols/ace/RMCast/RMCast_UDP_Receiver.cpp241
-rw-r--r--protocols/ace/RMCast/RMCast_UDP_Receiver.i9
-rw-r--r--protocols/ace/RMCast/RMCast_UDP_Sender.cpp91
-rw-r--r--protocols/ace/RMCast/RMCast_UDP_Sender.h70
-rw-r--r--protocols/ace/RMCast/RMCast_UDP_Sender.i1
33 files changed, 937 insertions, 715 deletions
diff --git a/protocols/ace/RMCast/README b/protocols/ace/RMCast/README
new file mode 100644
index 00000000000..2dd0c5d9cfc
--- /dev/null
+++ b/protocols/ace/RMCast/README
@@ -0,0 +1,57 @@
+# $Id$
+
+ This directory will contain a simple, small-scale reliable
+multicast framework for ACE. The framework is based on the ASX
+components of the ACE library: the protocol is implemented as a stack
+of interchangeable "modules", each one in charge of a very small task.
+For example, one module implements fragmentation and reassembly, other
+modules implement retransmission, send ACK and NAK messages, and
+maintain receiver membership.
+
+ The modules are replaced to achieve different levels of
+reliability. For example, the retransmission module can be either the
+"Best_Effort", "Semi_Reliable" or "Reliable" implementation. In the
+first case no retransmissions are performed, but lost messages are
+detected and reported to the receiver. The "Semi_Reliable" case
+messages are held for a pre-specified amount of time, and
+re-transmited if requested, but it is possible to loose some messages
+if multiple re-transmissions fail. As in the "Best_Effort" case the
+lost messages are detected and flagged to the application. Finally
+in the "Reliable" mode the senders are flowed controlled until enough
+messages are successfully transmitted.
+
+ In general the stack looks like this:
+
+
+SENDER:
+
+----------------------------------------------------------------
+Buffering : Save lost messages
+Retransmission : Retransmit
+----------------------------------------------------------------
+Fragmentation : Fragment messages in smaller chunks
+Reassembly : and ensure that the IOVMAX limit is not
+ : reached
+----------------------------------------------------------------
+Tranport : Encapsulate the specific transport media
+ : such as TCP/IP, ATM, or shared memory
+ : Demuxes incoming data to the right chain
+ : Change control messages and data messages
+ : to the right dynamic types.
+----------------------------------------------------------------
+
+RECEIVER:
+
+----------------------------------------------------------------
+Lost detection : Detect lost messages and send control
+ : messages back
+----------------------------------------------------------------
+Reassembly : Reassemble messages, fragment control
+Fragmentation : data
+----------------------------------------------------------------
+Transport : Group membership, ACT reception,
+ : handle keep-alive messages...
+----------------------------------------------------------------
+
+
+@@ TODO: Piggybacking...
diff --git a/protocols/ace/RMCast/RMCast.dsp b/protocols/ace/RMCast/RMCast.dsp
index 53fdbf5cb9c..e3a84679364 100644
--- a/protocols/ace/RMCast/RMCast.dsp
+++ b/protocols/ace/RMCast/RMCast.dsp
@@ -94,14 +94,50 @@ LINK32=link.exe
# PROP Default_Filter "cpp;c;cxx;rc;def;r;odl;idl;hpj;bat"
# Begin Source File
+SOURCE=.\RMCast.cpp
+# End Source File
+# Begin Source File
+
+SOURCE=.\RMCast_Fragment.cpp
+# End Source File
+# Begin Source File
+
+SOURCE=.\RMCast_IO_UDP.cpp
+# End Source File
+# Begin Source File
+
+SOURCE=.\RMCast_Module.cpp
+# End Source File
+# Begin Source File
+
+SOURCE=.\RMCast_Module_Factory.cpp
+# End Source File
+# Begin Source File
+
SOURCE=.\RMCast_Partial_Message.cpp
# End Source File
+# Begin Source File
+
+SOURCE=.\RMCast_Reassembly.cpp
+# End Source File
+# Begin Source File
+
+SOURCE=.\RMCast_UDP_Event_Handler.cpp
+# End Source File
+# Begin Source File
+
+SOURCE=.\RMCast_UDP_Proxy.cpp
+# End Source File
# End Group
# Begin Group "Header Files"
# PROP Default_Filter "h;hpp;hxx;hm;inl"
# Begin Source File
+SOURCE=.\RMCast.h
+# End Source File
+# Begin Source File
+
SOURCE=.\RMCast_Export.h
# End Source File
# Begin Source File
@@ -110,42 +146,76 @@ SOURCE=.\RMCast_Fragment.h
# End Source File
# Begin Source File
+SOURCE=.\RMCast_IO_UDP.h
+# End Source File
+# Begin Source File
+
+SOURCE=.\RMCast_Module.h
+# End Source File
+# Begin Source File
+
+SOURCE=.\RMCast_Module_Factory.h
+# End Source File
+# Begin Source File
+
SOURCE=.\RMCast_Partial_Message.h
# End Source File
# Begin Source File
SOURCE=.\RMCast_Reassembly.h
# End Source File
+# Begin Source File
+
+SOURCE=.\RMCast_UDP_Event_Handler.h
+# End Source File
+# Begin Source File
+
+SOURCE=.\RMCast_UDP_Proxy.h
+# End Source File
# End Group
# Begin Group "Inline Files"
# PROP Default_Filter "i"
# Begin Source File
+SOURCE=.\RMCast.i
+# End Source File
+# Begin Source File
+
SOURCE=.\RMCast_Fragment.i
# End Source File
# Begin Source File
+SOURCE=.\RMCast_IO_UDP.i
+# End Source File
+# Begin Source File
+
+SOURCE=.\RMCast_Module.i
+# End Source File
+# Begin Source File
+
+SOURCE=.\RMCast_Module_Factory.i
+# End Source File
+# Begin Source File
+
SOURCE=.\RMCast_Partial_Message.i
# End Source File
# Begin Source File
SOURCE=.\RMCast_Reassembly.i
# End Source File
-# End Group
-# Begin Group "Template Files"
-
-# PROP Default_Filter ""
# Begin Source File
-SOURCE=.\RMCast_Fragment.cpp
-# PROP Exclude_From_Build 1
+SOURCE=.\RMCast_UDP_Event_Handler.i
# End Source File
# Begin Source File
-SOURCE=.\RMCast_Reassembly.cpp
-# PROP Exclude_From_Build 1
+SOURCE=.\RMCast_UDP_Proxy.i
# End Source File
# End Group
+# Begin Group "Template Files"
+
+# PROP Default_Filter ""
+# End Group
# End Target
# End Project
diff --git a/protocols/ace/RMCast/RMCast.h b/protocols/ace/RMCast/RMCast.h
index 025f82a1bfb..654f391204b 100644
--- a/protocols/ace/RMCast/RMCast.h
+++ b/protocols/ace/RMCast/RMCast.h
@@ -62,8 +62,8 @@ public:
// +---------+----------------------+
// | 32 bits | fragment_offset |
// +---------+----------------------+
- // | 32 bits | payload_size |
- // +---------+----------------------+
+ // ? ? ? ? ? | 32 bits | payload_size |
+ // ? ? ? ? ? +---------+----------------------+
// | | payload |
// +---------+----------------------+
//
@@ -84,9 +84,9 @@ public:
// +---------+----------------------+
// | 8 bits | MT_ACK |
// +---------+----------------------+
- // | 32 bits | last_successful |
+ // | 32 bits | highest_in_sequence |
// +---------+----------------------+
- // | 32 bits | last_received |
+ // | 32 bits | highest_received |
// +---------+----------------------+
//
@@ -196,26 +196,30 @@ public:
ACE_Message_Block *payload;
};
- struct Ack
+ struct Poll
{
- ACE_UINT32 expected;
- ACE_UINT32 last_received;
};
- struct Join
+ struct Ack_Join
{
+ ACE_INT32 next_sequence_number;
};
- struct Leave
+ struct Ack_Leave
{
};
- struct Ack_Join
+ struct Ack
{
- ACE_INT32 next_sequence_number;
+ ACE_UINT32 highest_in_sequence;
+ ACE_UINT32 highest_received;
};
- struct Ack_Leave
+ struct Join
+ {
+ };
+
+ struct Leave
{
};
};
diff --git a/protocols/ace/RMCast/RMCast_Fragment.cpp b/protocols/ace/RMCast/RMCast_Fragment.cpp
index b3baee4f972..976def7a241 100644
--- a/protocols/ace/RMCast/RMCast_Fragment.cpp
+++ b/protocols/ace/RMCast/RMCast_Fragment.cpp
@@ -25,7 +25,7 @@ ACE_RMCast_Fragment::~ACE_RMCast_Fragment (void)
}
int
-ACE_RMCast_Fragment::put_data (ACE_RMCast::Data &received_data)
+ACE_RMCast_Fragment::data (ACE_RMCast::Data &received_data)
{
if (this->next () == 0)
return 0;
@@ -38,7 +38,7 @@ ACE_RMCast_Fragment::put_data (ACE_RMCast::Data &received_data)
// @@ We should keep the total size precomputed
data.total_size = mb->total_size ();
- // We must leave room for the header
+ // We must leave room for the header
#if defined (ACE_HAS_BROKEN_DGRAM_SENDV)
const int ACE_RMCAST_WRITEV_MAX = IOV_MAX - 2;
#else
@@ -126,7 +126,7 @@ ACE_RMCast_Fragment::put_data (ACE_RMCast::Data &received_data)
+ last_sent_mb_len);
data.payload = blocks;
- if (this->next ()->put_data (data) == -1)
+ if (this->next ()->data (data) == -1)
return -1;
// adjust the offset
@@ -172,7 +172,7 @@ ACE_RMCast_Fragment::put_data (ACE_RMCast::Data &received_data)
// have:
if (iovcnt == ACE_RMCAST_WRITEV_MAX)
{
- if (this->next ()->put_data (data) == -1)
+ if (this->next ()->data (data) == -1)
return -1;
iovcnt = 0;
@@ -184,5 +184,5 @@ ACE_RMCast_Fragment::put_data (ACE_RMCast::Data &received_data)
if (iovcnt == 0)
return 0;
- return this->next ()->put_data (data);
+ return this->next ()->data (data);
}
diff --git a/protocols/ace/RMCast/RMCast_Fragment.h b/protocols/ace/RMCast/RMCast_Fragment.h
index e42440b6c12..7b64d763ebc 100644
--- a/protocols/ace/RMCast/RMCast_Fragment.h
+++ b/protocols/ace/RMCast/RMCast_Fragment.h
@@ -40,7 +40,7 @@ public:
// feedback from the lower layer (transport?)
// = The ACE_RMCast_Module methods
- virtual int put_data (ACE_RMCast::Data &data);
+ virtual int data (ACE_RMCast::Data &data);
private:
size_t max_fragment_size_;
diff --git a/protocols/ace/RMCast/RMCast_IO_UDP.cpp b/protocols/ace/RMCast/RMCast_IO_UDP.cpp
new file mode 100644
index 00000000000..89cc7ae3c3a
--- /dev/null
+++ b/protocols/ace/RMCast/RMCast_IO_UDP.cpp
@@ -0,0 +1,418 @@
+//
+// $Id$
+//
+
+#include "RMCast_IO_UDP.h"
+#include "RMCast_UDP_Proxy.h"
+#include "RMCast_Module_Factory.h"
+#include "ace/Handle_Set.h"
+#include "ace/Reactor.h"
+#include "ace/Message_Block.h"
+
+#if !defined (__ACE_INLINE__)
+# include "RMCast_IO_UDP.i"
+#endif /* ! __ACE_INLINE__ */
+
+ACE_RCSID(ace, RMCast_IO_UDP, "$Id$")
+
+ACE_RMCast_IO_UDP::~ACE_RMCast_IO_UDP (void)
+{
+}
+
+int
+ACE_RMCast_IO_UDP::subscribe (const ACE_INET_Addr &mcast_addr,
+ int reuse_addr,
+ const ACE_TCHAR *net_if,
+ int protocol_family,
+ int protocol)
+{
+ this->mcast_group_ = mcast_addr;
+ return this->dgram_.subscribe (mcast_addr,
+ reuse_addr,
+ net_if,
+ protocol_family,
+ protocol);
+}
+
+int
+ACE_RMCast_IO_UDP::handle_events (ACE_Time_Value *tv)
+{
+ ACE_HANDLE h = this->dgram_.get_handle ();
+ if (h == ACE_INVALID_HANDLE)
+ return -1;
+
+ ACE_Handle_Set handle_set;
+ handle_set.set_bit (h);
+
+ ACE_Countdown_Time countdown (tv);
+
+ int r = ACE_OS::select (int(h) + 1,
+ handle_set, 0, 0,
+ tv);
+ if (r == -1)
+ {
+ if (errno == EINTR)
+ return 0;
+ else
+ return -1;
+ }
+ else if (r == 0)
+ {
+ return 0;
+ }
+
+ return this->handle_input (h);
+}
+
+int
+ACE_RMCast_IO_UDP::register_handlers (ACE_Reactor *reactor)
+{
+ this->eh_.reactor (reactor);
+ return reactor->register_handler (&this->eh_,
+ ACE_Event_Handler::READ_MASK);
+}
+
+int
+ACE_RMCast_IO_UDP::remove_handlers (void)
+{
+ ACE_Reactor *r = this->eh_.reactor ();
+ if (r != 0)
+ {
+ r->remove_handler (&this->eh_,
+ ACE_Event_Handler::ALL_EVENTS_MASK
+ | ACE_Event_Handler::DONT_CALL);
+ this->eh_.reactor (0);
+ }
+ return 0;
+}
+
+int
+ACE_RMCast_IO_UDP::handle_input (ACE_HANDLE)
+{
+ // @@ We should use a system constant instead of this literal
+ const int max_udp_packet_size = 65536;
+ char buffer[max_udp_packet_size];
+
+ ACE_INET_Addr from_address;
+ ssize_t r =
+ this->dgram_.recv (buffer, sizeof(buffer), from_address);
+
+ if (r == -1)
+ {
+ // @@ LOG??
+ ACE_DEBUG ((LM_DEBUG,
+ "RMCast_IO_UDP::handle_input () - "
+ "error in recv\n"));
+ return -1;
+ }
+
+ // ACE_HEX_DUMP ((LM_DEBUG, buffer, 16, "Receiver::handle_input"));
+
+ // @@ Locking!
+
+ int type = buffer[0];
+
+ if (type < 0 || type >= ACE_RMCast::MT_LAST)
+ {
+ // @@ Log: invalid message type!!
+ // @@ TODO: should we return -1? The socket is still valid, it
+ // makes little sense to destroy it just because one remote
+ // sender is sending invalid messages. Maybe we should
+ // strategize this too, and report the problem to the
+ // application, this could indicate a misconfiguration or
+ // something worse...
+
+ // In any case the proxy should be destroyed, its peer is making
+ // something really wrong.
+ ACE_RMCast_UDP_Proxy *proxy;
+ if (this->map_.unbind (from_address, proxy) == 0)
+ {
+ this->factory_->destroy (proxy->module ());
+ delete proxy;
+ }
+ return 0;
+ }
+
+ ACE_RMCast_UDP_Proxy *proxy;
+ if (this->map_.find (from_address, proxy) != 0)
+ {
+ // State == RS_NON_EXISTENT
+
+ // @@ We should validate the message *before* creating the
+ // object, all we need is some sort of validation strategy, a
+ // different one for the receiver and another one for the
+ // sender.
+#if 0
+ if (type == ACE_RMCast::MT_ACK
+ || type == ACE_RMCast::MT_JOIN
+ || type == ACE_RMCast::MT_LEAVE
+ || type == ACE_RMCast::MT_ACK_LEAVE)
+ {
+ // All these message types indicate a problem, the should be
+ // generated by receivers, not received by them.
+ return 0;
+ }
+#endif /* 0 */
+
+ // The message type is valid, we must create a new proxy,
+ // initially in the JOINING state...
+ ACE_RMCast_Module *module = this->factory_->create (this);
+ if (module == 0)
+ {
+ // @@ LOG??
+ // Try to continue working, maybe the module can be created
+ // later.
+ return 0;
+ }
+ ACE_NEW_RETURN (proxy,
+ ACE_RMCast_UDP_Proxy(this,
+ from_address,
+ module),
+ 0);
+
+ if (this->map_.bind (from_address, proxy) != 0)
+ {
+ // @@ LOG??
+ return 0;
+ }
+
+ }
+
+ // Have the proxy process the message and do the right thing.
+ return proxy->receive_message (buffer, r);
+}
+
+ACE_HANDLE
+ACE_RMCast_IO_UDP::get_handle (void) const
+{
+ return this->dgram_.get_handle ();
+}
+
+int
+ACE_RMCast_IO_UDP::data (ACE_RMCast::Data &data)
+{
+ return this->send_data (data, this->mcast_group_);
+}
+
+int
+ACE_RMCast_IO_UDP::poll (ACE_RMCast::Poll &poll)
+{
+ return this->send_poll (poll, this->mcast_group_);
+}
+
+int
+ACE_RMCast_IO_UDP::ack_join (ACE_RMCast::Ack_Join &ack_join)
+{
+ return this->send_ack_join (ack_join, this->mcast_group_);
+}
+
+int
+ACE_RMCast_IO_UDP::ack_leave (ACE_RMCast::Ack_Leave &ack_leave)
+{
+ return this->send_ack_leave (ack_leave, this->mcast_group_);
+}
+
+int
+ACE_RMCast_IO_UDP::ack (ACE_RMCast::Ack &ack)
+{
+ return this->send_ack (ack, this->mcast_group_);
+}
+
+int
+ACE_RMCast_IO_UDP::join (ACE_RMCast::Join &join)
+{
+ return this->send_join (join, this->mcast_group_);
+}
+
+int
+ACE_RMCast_IO_UDP::leave (ACE_RMCast::Leave &leave)
+{
+ return this->send_leave (leave, this->mcast_group_);
+}
+
+int
+ACE_RMCast_IO_UDP::send_data (ACE_RMCast::Data &data,
+ const ACE_INET_Addr &to)
+{
+ // The first message block contains the header
+ // @@ TODO: We could keep the header pre-initialized, and only
+ // update the portions that do change...
+ ACE_UINT32 tmp;
+ char header[1 + 3 * sizeof(ACE_UINT32)];
+ header[0] = ACE_RMCast::MT_DATA;
+
+ tmp = ACE_HTONL (data.sequence_number);
+ ACE_OS::memcpy (header + 1,
+ &tmp, sizeof(ACE_UINT32));
+ tmp = ACE_HTONL (data.total_size);
+ ACE_OS::memcpy (header + 1 + sizeof(ACE_UINT32),
+ &tmp, sizeof(ACE_UINT32));
+ tmp = ACE_HTONL (data.fragment_offset);
+ ACE_OS::memcpy (header + 1 + 2 * sizeof(ACE_UINT32),
+ &tmp, sizeof(ACE_UINT32));
+
+ iovec iov[IOV_MAX];
+ int iovcnt = 1;
+
+ iov[0].iov_base = header;
+ iov[0].iov_len = sizeof(header);
+
+ ACE_Message_Block *mb = data.payload;
+
+ for (const ACE_Message_Block *i = mb; i != 0; i = i->cont ())
+ {
+ iov[iovcnt].iov_base = i->rd_ptr ();
+ iov[iovcnt].iov_len = i->length ();
+ iovcnt++;
+ if (iovcnt >= IOV_MAX)
+ return -1;
+ }
+
+ // @@ This pacing stuff here reduced the number of packet lost in
+ // loopback tests, but it should be taken out for real applications
+ // (or at least made configurable!)
+ ACE_Time_Value tv (0, 10000);
+ ACE_OS::sleep (tv);
+
+ // ACE_SOCK_MCast_Dgram disallows sending, but it actually works.
+ ACE_SOCK_Dgram &dgram = this->dgram_;
+
+ if (dgram.send (iov, iovcnt, to) == -1)
+ return -1;
+
+#if 0
+ ACE_HEX_DUMP ((LM_DEBUG,
+ (char*)iov[0].iov_base, iov[0].iov_len, "Sending"));
+#endif
+
+ return 0;
+}
+
+int
+ACE_RMCast_IO_UDP::send_poll (ACE_RMCast::Poll &poll,
+ const ACE_INET_Addr &to)
+{
+ // @@ TODO: We could keep the header pre-initialized, and only
+ // update the portions that do change...
+ char header[16];
+ header[0] = ACE_RMCast::MT_POLL;
+
+ // ACE_SOCK_MCast_Dgram disallows sending, but it actually works.
+ ACE_SOCK_Dgram &dgram = this->dgram_;
+
+ if (dgram.send (header, 1, to) == -1)
+ return -1;
+
+ return 0;
+}
+
+int
+ACE_RMCast_IO_UDP::send_ack_join (ACE_RMCast::Ack_Join &ack_join,
+ const ACE_INET_Addr &to)
+{
+ // @@ TODO: We could keep the header pre-initialized, and only
+ // update the portions that do change...
+ char header[16];
+ header[0] = ACE_RMCast::MT_ACK_JOIN;
+
+ ACE_UINT32 tmp = ACE_HTONL (ack_join.next_sequence_number);
+ ACE_OS::memcpy (header + 1,
+ &tmp, sizeof(ACE_UINT32));
+ // ACE_SOCK_MCast_Dgram disallows sending, but it actually works.
+ ACE_SOCK_Dgram &dgram = this->dgram_;
+
+ if (dgram.send (header, 1 + sizeof(ACE_UINT32), to) == -1)
+ return -1;
+
+ return 0;
+}
+
+int
+ACE_RMCast_IO_UDP::send_ack_leave (ACE_RMCast::Ack_Leave &ack_leave,
+ const ACE_INET_Addr &to)
+{
+ // @@ TODO: We could keep the header pre-initialized, and only
+ // update the portions that do change...
+ char header[16];
+ header[0] = ACE_RMCast::MT_ACK_LEAVE;
+
+ // ACE_SOCK_MCast_Dgram disallows sending, but it actually works.
+ ACE_SOCK_Dgram &dgram = this->dgram_;
+
+ if (dgram.send (header, 1, to) == -1)
+ return -1;
+
+ return 0;
+}
+
+int
+ACE_RMCast_IO_UDP::send_ack (ACE_RMCast::Ack &ack,
+ const ACE_INET_Addr &to)
+{
+ // @@ TODO: We could keep the header pre-initialized, and only
+ // update the portions that do change...
+ char header[16];
+ header[0] = ACE_RMCast::MT_ACK;
+
+ ACE_UINT32 tmp = ACE_HTONL (ack.highest_in_sequence);
+ ACE_OS::memcpy (header + 1,
+ &tmp, sizeof(ACE_UINT32));
+ tmp = ACE_HTONL (ack.highest_received);
+ ACE_OS::memcpy (header + 1 + sizeof(ACE_UINT32),
+ &tmp, sizeof(ACE_UINT32));
+
+ // ACE_SOCK_MCast_Dgram disallows sending, but it actually works.
+ ACE_SOCK_Dgram &dgram = this->dgram_;
+
+ if (dgram.send (header, 1 + 2*sizeof(ACE_UINT32), to) == -1)
+ return -1;
+
+ return 0;
+}
+
+int
+ACE_RMCast_IO_UDP::send_join (ACE_RMCast::Join &join,
+ const ACE_INET_Addr &to)
+{
+ // @@ TODO: We could keep the header pre-initialized, and only
+ // update the portions that do change...
+ char header[16];
+ header[0] = ACE_RMCast::MT_JOIN;
+
+ // ACE_SOCK_MCast_Dgram disallows sending, but it actually works.
+ ACE_SOCK_Dgram &dgram = this->dgram_;
+
+ if (dgram.send (header, 1, to) == -1)
+ return -1;
+
+ return 0;
+}
+
+int
+ACE_RMCast_IO_UDP::send_leave (ACE_RMCast::Leave &,
+ const ACE_INET_Addr &to)
+{
+ // @@ TODO: We could keep the header pre-initialized, and only
+ // update the portions that do change...
+ char header[16];
+ header[0] = ACE_RMCast::MT_LEAVE;
+
+ // ACE_SOCK_MCast_Dgram disallows sending, but it actually works.
+ ACE_SOCK_Dgram &dgram = this->dgram_;
+
+ if (dgram.send (header, 1, to) == -1)
+ return -1;
+
+ return 0;
+}
+
+
+#if defined (ACE_HAS_EXPLICIT_TEMPLATE_INSTANTIATION)
+
+template class ACE_Hash_Map_Manager<ACE_INET_Addr,ACE_RMCast_UDP_Proxy*,ACE_Null_Mutex>;
+template class ACE_Hash_Map_Manager_Ex<ACE_INET_Addr,ACE_RMCast_UDP_Proxy*,ACE_Hash<ACE_INET_Addr>,ACE_Equal_To<ACE_INET_Addr>,ACE_Null_Mutex>;
+template class ACE_Hash_Map_Iterator<ACE_INET_Addr,ACE_RMCast_UDP_Proxy*,ACE_Null_Mutex>;
+template class ACE_Hash_Map_Iterator_Base_Ex<ACE_INET_Addr,ACE_RMCast_UDP_Proxy*,ACE_Hash<ACE_INET_Addr>,ACE_Equal_To<ACE_INET_Addr>,ACE_Null_Mutex>;
+template class ACE_Hash_Map_Entry<ACE_INET_Addr,ACE_RMCast_UDP_Proxy*>;
+
+#endif /* ACE_HAS_EXPLICIT_TEMPLATE_INSTANTIATION */
diff --git a/protocols/ace/RMCast/RMCast_UDP_Receiver.h b/protocols/ace/RMCast/RMCast_IO_UDP.h
index bfc56d89705..bdcccabe6e1 100644
--- a/protocols/ace/RMCast/RMCast_UDP_Receiver.h
+++ b/protocols/ace/RMCast/RMCast_IO_UDP.h
@@ -10,35 +10,36 @@
//
// ============================================================================
-#ifndef ACE_RMCAST_UDP_RECEIVER_H
-#define ACE_RMCAST_UDP_RECEIVER_H
+#ifndef ACE_RMCAST_IO_UDP_H
+#define ACE_RMCAST_IO_UDP_H
#include "ace/pre.h"
+#include "RMCast_Module.h"
#include "RMCast_UDP_Event_Handler.h"
#include "ace/SOCK_Dgram_Mcast.h"
#include "ace/Hash_Map_Manager.h"
#include "ace/Synch.h"
+#include "ace/INET_Addr.h"
#if !defined (ACE_LACKS_PRAGMA_ONCE)
# pragma once
#endif /* ACE_LACKS_PRAGMA_ONCE */
-class ACE_RMCast_Sender_Proxy;
-class ACE_RMCast_Sender_Proxy_Factory;
+class ACE_RMCast_UDP_Proxy;
+class ACE_RMCast_Module_Factory;
class ACE_Reactor;
class ACE_Time_Value;
-class ACE_INET_Addr;
-class ACE_RMCast_Export ACE_RMCast_UDP_Receiver
+class ACE_RMCast_Export ACE_RMCast_IO_UDP : public ACE_RMCast_Module
{
public:
- ACE_RMCast_UDP_Receiver (ACE_RMCast_Sender_Proxy_Factory *factory);
+ ACE_RMCast_IO_UDP (ACE_RMCast_Module_Factory *factory);
// Constructor
- // <factory> is used to create the Sender_Proxy and Modules that
- // process incoming messages.
- // The caller owns <factory>.
+ // <factory> is used to create the modules for each proxy that
+ // process incoming messages. The class does *not* assume ownership
+ // of <factory>, the caller owns it.
- ~ACE_RMCast_UDP_Receiver (void);
+ ~ACE_RMCast_IO_UDP (void);
// Destructor
int subscribe (const ACE_INET_Addr &mcast_addr,
@@ -69,25 +70,36 @@ public:
ACE_HANDLE get_handle (void) const;
// Obtain the handle for the underlying socket
-private:
- int send_join (ACE_INET_Addr &from);
- // Send a JOIN message
-
- int send_ack (ACE_RMCast_Sender_Proxy *sender_proxy,
- ACE_INET_Addr &from);
- // Send an ACK message
-
- int send_leave (ACE_INET_Addr &from);
- // Send a LEAVE messsage
+ // Send back to the remove object represented by <proxy>
+ int send_data (ACE_RMCast::Data &, const ACE_INET_Addr &);
+ int send_poll (ACE_RMCast::Poll &, const ACE_INET_Addr &);
+ int send_ack_join (ACE_RMCast::Ack_Join &, const ACE_INET_Addr &);
+ int send_ack_leave (ACE_RMCast::Ack_Leave &, const ACE_INET_Addr &);
+ int send_ack (ACE_RMCast::Ack &, const ACE_INET_Addr &);
+ int send_join (ACE_RMCast::Join &, const ACE_INET_Addr &);
+ int send_leave (ACE_RMCast::Leave &, const ACE_INET_Addr &);
+
+ // = The RMCast_Module methods
+ virtual int data (ACE_RMCast::Data &);
+ virtual int poll (ACE_RMCast::Poll &);
+ virtual int ack_join (ACE_RMCast::Ack_Join &);
+ virtual int ack_leave (ACE_RMCast::Ack_Leave &);
+ virtual int ack (ACE_RMCast::Ack &);
+ virtual int join (ACE_RMCast::Join &);
+ virtual int leave (ACE_RMCast::Leave &);
+ // The messages are sent to the multicast group
private:
- ACE_RMCast_Sender_Proxy_Factory *factory_;
- // The factory used to create Sender proxies
+ ACE_RMCast_Module_Factory *factory_;
+ // The factory used to create the modules attached to each proxy
+
+ ACE_INET_Addr mcast_group_;
+ // The multicast group we subscribe and send to
ACE_SOCK_Dgram_Mcast dgram_;
// The socket
- typedef ACE_Hash_Map_Manager<ACE_INET_Addr,ACE_RMCast_Sender_Proxy*,ACE_Null_Mutex> Map;
+ typedef ACE_Hash_Map_Manager<ACE_INET_Addr,ACE_RMCast_UDP_Proxy*,ACE_Null_Mutex> Map;
Map map_;
ACE_RMCast_UDP_Event_Handler eh_;
@@ -95,8 +107,8 @@ private:
};
#if defined (__ACE_INLINE__)
-#include "RMCast_UDP_Receiver.i"
+#include "RMCast_IO_UDP.i"
#endif /* __ACE_INLINE__ */
#include "ace/post.h"
-#endif /* ACE_RMCAST_UDP_RECEIVER_H */
+#endif /* ACE_RMCAST_IO_UDP_H */
diff --git a/protocols/ace/RMCast/RMCast_IO_UDP.i b/protocols/ace/RMCast/RMCast_IO_UDP.i
new file mode 100644
index 00000000000..ddacc5694ad
--- /dev/null
+++ b/protocols/ace/RMCast/RMCast_IO_UDP.i
@@ -0,0 +1,9 @@
+// $Id$
+
+ACE_INLINE
+ACE_RMCast_IO_UDP::
+ ACE_RMCast_IO_UDP (ACE_RMCast_Module_Factory *factory)
+ : factory_ (factory)
+ , eh_ (this)
+{
+}
diff --git a/protocols/ace/RMCast/RMCast_Module.cpp b/protocols/ace/RMCast/RMCast_Module.cpp
index b47694abe4d..632d905f900 100644
--- a/protocols/ace/RMCast/RMCast_Module.cpp
+++ b/protocols/ace/RMCast/RMCast_Module.cpp
@@ -55,3 +55,59 @@ ACE_RMCast_Module::close (void)
{
return 0;
}
+
+int
+ACE_RMCast_Module::data (ACE_RMCast::Data &data)
+{
+ if (this->next () != 0)
+ return this->next ()->data (data);
+ return 0;
+}
+
+int
+ACE_RMCast_Module::poll (ACE_RMCast::Poll &poll)
+{
+ if (this->next () != 0)
+ return this->next ()->poll (poll);
+ return 0;
+}
+
+int
+ACE_RMCast_Module::ack_join (ACE_RMCast::Ack_Join &ack_join)
+{
+ if (this->next () != 0)
+ return this->next ()->ack_join (ack_join);
+ return 0;
+}
+
+int
+ACE_RMCast_Module::ack_leave (ACE_RMCast::Ack_Leave &ack_leave)
+{
+ if (this->next () != 0)
+ return this->next ()->ack_leave (ack_leave);
+ return 0;
+}
+
+int
+ACE_RMCast_Module::ack (ACE_RMCast::Ack &ack)
+{
+ if (this->next () != 0)
+ return this->next ()->ack (ack);
+ return 0;
+}
+
+int
+ACE_RMCast_Module::join (ACE_RMCast::Join &join)
+{
+ if (this->next () != 0)
+ return this->next ()->join (join);
+ return 0;
+}
+
+int
+ACE_RMCast_Module::leave (ACE_RMCast::Leave &leave)
+{
+ if (this->next () != 0)
+ return this->next ()->leave (leave);
+ return 0;
+}
diff --git a/protocols/ace/RMCast/RMCast_Module.h b/protocols/ace/RMCast/RMCast_Module.h
index 30f3da2f4fe..9f83c2d5be4 100644
--- a/protocols/ace/RMCast/RMCast_Module.h
+++ b/protocols/ace/RMCast/RMCast_Module.h
@@ -59,7 +59,13 @@ public:
virtual int close (void);
// Close the module.
- virtual int put_data (ACE_RMCast::Data &data) = 0;
+ virtual int data (ACE_RMCast::Data &);
+ virtual int poll (ACE_RMCast::Poll &);
+ virtual int ack_join (ACE_RMCast::Ack_Join &);
+ virtual int ack_leave (ACE_RMCast::Ack_Leave &);
+ virtual int ack (ACE_RMCast::Ack &);
+ virtual int join (ACE_RMCast::Join &);
+ virtual int leave (ACE_RMCast::Leave &);
// Push data down the stack
private:
diff --git a/protocols/ace/RMCast/RMCast_Module_Factory.cpp b/protocols/ace/RMCast/RMCast_Module_Factory.cpp
new file mode 100644
index 00000000000..b749048a78c
--- /dev/null
+++ b/protocols/ace/RMCast/RMCast_Module_Factory.cpp
@@ -0,0 +1,13 @@
+// $Id$
+
+#include "RMCast_Module_Factory.h"
+
+#if !defined (__ACE_INLINE__)
+# include "RMCast_Module_Factory.i"
+#endif /* ! __ACE_INLINE__ */
+
+ACE_RCSID(ace, RMCast_Module_Factory, "$Id$")
+
+ACE_RMCast_Module_Factory::~ACE_RMCast_Module_Factory (void)
+{
+}
diff --git a/protocols/ace/RMCast/RMCast_Module_Factory.h b/protocols/ace/RMCast/RMCast_Module_Factory.h
new file mode 100644
index 00000000000..722ad87d678
--- /dev/null
+++ b/protocols/ace/RMCast/RMCast_Module_Factory.h
@@ -0,0 +1,50 @@
+/* -*- C++ -*- */
+// $Id$
+
+// ============================================================================
+//
+// = LIBRARY
+// ace
+//
+// = FILENAME
+// RMCast_Module_Factory.h
+//
+// = AUTHOR
+// Carlos O'Ryan <coryan@uci.edu>
+//
+// ============================================================================
+
+#ifndef ACE_RMCAST_MODULE_FACTORY_H
+#define ACE_RMCAST_MODULE_FACTORY_H
+#include "ace/pre.h"
+
+#include "RMCast.h"
+
+#if !defined (ACE_LACKS_PRAGMA_ONCE)
+# pragma once
+#endif /* ACE_LACKS_PRAGMA_ONCE */
+
+class ACE_RMCast_Module;
+class ACE_RMCast_IO_UDP;
+
+class ACE_RMCast_Export ACE_RMCast_Module_Factory
+{
+ // = DESCRIPTION
+ //
+public:
+ virtual ~ACE_RMCast_Module_Factory (void);
+ // Destructor
+
+ virtual ACE_RMCast_Module *create (ACE_RMCast_IO_UDP *) = 0;
+ // Create a new proxy
+
+ virtual void destroy (ACE_RMCast_Module *) = 0;
+ // Destroy a proxy
+};
+
+#if defined (__ACE_INLINE__)
+#include "RMCast_Module_Factory.i"
+#endif /* __ACE_INLINE__ */
+
+#include "ace/post.h"
+#endif /* ACE_RMCAST_MODULE_FACTORY_H */
diff --git a/protocols/ace/RMCast/RMCast_Sender_Proxy_Best_Effort.i b/protocols/ace/RMCast/RMCast_Module_Factory.i
index cfa1da318d3..cfa1da318d3 100644
--- a/protocols/ace/RMCast/RMCast_Sender_Proxy_Best_Effort.i
+++ b/protocols/ace/RMCast/RMCast_Module_Factory.i
diff --git a/protocols/ace/RMCast/RMCast_Reassembly.cpp b/protocols/ace/RMCast/RMCast_Reassembly.cpp
index a52791e1ebf..ba2e9b79c1a 100644
--- a/protocols/ace/RMCast/RMCast_Reassembly.cpp
+++ b/protocols/ace/RMCast/RMCast_Reassembly.cpp
@@ -34,7 +34,7 @@ ACE_RMCast_Reassembly::~ACE_RMCast_Reassembly (void)
}
int
-ACE_RMCast_Reassembly::put_data (ACE_RMCast::Data &data)
+ACE_RMCast_Reassembly::data (ACE_RMCast::Data &data)
{
if (this->next () == 0)
return 0;
@@ -42,7 +42,7 @@ ACE_RMCast_Reassembly::put_data (ACE_RMCast::Data &data)
if (data.payload->length () + data.fragment_offset > data.total_size)
{
ACE_DEBUG ((LM_DEBUG,
- "RMCast_Reassembly::put_data - invalid size\n"));
+ "RMCast_Reassembly::data - invalid size\n"));
return -1; // Corrupt message?
}
@@ -92,7 +92,7 @@ ACE_RMCast_Reassembly::put_data (ACE_RMCast::Data &data)
downstream_data.fragment_offset = 0;
downstream_data.payload = message->message_body ();
- int r = this->next ()->put_data (downstream_data);
+ int r = this->next ()->data (downstream_data);
delete message;
diff --git a/protocols/ace/RMCast/RMCast_Reassembly.h b/protocols/ace/RMCast/RMCast_Reassembly.h
index 0982d059c7c..0bf0c3a61ee 100644
--- a/protocols/ace/RMCast/RMCast_Reassembly.h
+++ b/protocols/ace/RMCast/RMCast_Reassembly.h
@@ -34,7 +34,7 @@ public:
// Destructor
// = The ACE_RMCast_Module methods
- virtual int put_data (ACE_RMCast::Data &data);
+ virtual int data (ACE_RMCast::Data &data);
private:
ACE_SYNCH_MUTEX mutex_;
diff --git a/protocols/ace/RMCast/RMCast_Sender_Proxy.cpp b/protocols/ace/RMCast/RMCast_Sender_Proxy.cpp
deleted file mode 100644
index ff1b7b33f15..00000000000
--- a/protocols/ace/RMCast/RMCast_Sender_Proxy.cpp
+++ /dev/null
@@ -1,20 +0,0 @@
-// $Id$
-
-#include "RMCast_Sender_Proxy.h"
-#include "RMCast_Module.h"
-#include "ace/Message_Block.h"
-
-#if !defined (__ACE_INLINE__)
-# include "RMCast_Sender_Proxy.i"
-#endif /* ! __ACE_INLINE__ */
-
-ACE_RCSID(ace, RMCast_Sender_Proxy, "$Id$")
-
-ACE_RMCast_Sender_Proxy::ACE_RMCast_Sender_Proxy (ACE_RMCast_Module *module)
- : module_ (module)
-{
-}
-
-ACE_RMCast_Sender_Proxy::~ACE_RMCast_Sender_Proxy (void)
-{
-}
diff --git a/protocols/ace/RMCast/RMCast_Sender_Proxy.i b/protocols/ace/RMCast/RMCast_Sender_Proxy.i
deleted file mode 100644
index b47573711ea..00000000000
--- a/protocols/ace/RMCast/RMCast_Sender_Proxy.i
+++ /dev/null
@@ -1,7 +0,0 @@
-// $Id$
-
-ACE_INLINE ACE_RMCast_Module *
-ACE_RMCast_Sender_Proxy::module (void) const
-{
- return this->module_;
-}
diff --git a/protocols/ace/RMCast/RMCast_Sender_Proxy_Best_Effort.cpp b/protocols/ace/RMCast/RMCast_Sender_Proxy_Best_Effort.cpp
deleted file mode 100644
index ba525f245bc..00000000000
--- a/protocols/ace/RMCast/RMCast_Sender_Proxy_Best_Effort.cpp
+++ /dev/null
@@ -1,72 +0,0 @@
-// $Id$
-
-#include "RMCast_Sender_Proxy_Best_Effort.h"
-#include "RMCast_Module.h"
-#include "ace/Message_Block.h"
-
-#if !defined (__ACE_INLINE__)
-# include "RMCast_Sender_Proxy_Best_Effort.i"
-#endif /* ! __ACE_INLINE__ */
-
-ACE_RCSID(ace, RMCast_Sender_Proxy_Best_Effort, "$Id$")
-
-ACE_RMCast_Sender_Proxy_Best_Effort::
- ACE_RMCast_Sender_Proxy_Best_Effort (ACE_RMCast_Module *module)
- : ACE_RMCast_Sender_Proxy (module)
-{
-}
-
-ACE_RMCast_Sender_Proxy_Best_Effort::
- ~ACE_RMCast_Sender_Proxy_Best_Effort (void)
-{
-}
-
-int
-ACE_RMCast_Sender_Proxy_Best_Effort::receive_message (char *buffer,
- size_t size)
-{
- int type = buffer[0];
-
- // All control messages are ignored...
- if (type != ACE_RMCast::MT_DATA)
- return 0;
-
- // @@ Push the event through the stack
-#if 0
- ACE_DEBUG ((LM_DEBUG,
- "Proxy(%x) - received data\n", long(this)));
- ACE_HEX_DUMP ((LM_DEBUG, buffer, header, "Proxy"));
-#endif
-
- const size_t header_size = 1 + 3 * sizeof(ACE_UINT32);
- if (size < header_size)
- {
- // The message is too small
- return 0;
- }
-
- ACE_UINT32 tmp;
-
- ACE_RMCast::Data data;
-
- ACE_OS::memcpy (&tmp, buffer + 1,
- sizeof(tmp));
- data.sequence_number = ACE_NTOHL (tmp);
-
- ACE_OS::memcpy (&tmp, buffer + 1 + sizeof(tmp),
- sizeof(tmp));
- data.total_size = ACE_NTOHL (tmp);
-
- ACE_OS::memcpy (&tmp, buffer + 1 + 2 * sizeof(tmp),
- sizeof(tmp));
- data.fragment_offset = ACE_NTOHL (tmp);
-
- // Pass it up the module...
- ACE_Message_Block *mb;
- ACE_NEW_RETURN (mb, ACE_Message_Block, -1);
- mb->size (size - header_size);
- mb->copy (buffer + header_size, size - header_size);
-
- data.payload = mb;
- return this->module ()->put_data (data);
-}
diff --git a/protocols/ace/RMCast/RMCast_Sender_Proxy_Best_Effort.h b/protocols/ace/RMCast/RMCast_Sender_Proxy_Best_Effort.h
deleted file mode 100644
index 304e026afc3..00000000000
--- a/protocols/ace/RMCast/RMCast_Sender_Proxy_Best_Effort.h
+++ /dev/null
@@ -1,53 +0,0 @@
-/* -*- C++ -*- */
-// $Id$
-
-// ============================================================================
-//
-// = LIBRARY
-// ace
-//
-// = FILENAME
-// RMCast_Sender_Proxy_Best_Effort.h
-//
-// = AUTHOR
-// Carlos O'Ryan <coryan@uci.edu>
-//
-// ============================================================================
-
-#ifndef ACE_RMCAST_SENDER_PROXY_BEST_EFFORT_H
-#define ACE_RMCAST_SENDER_PROXY_BEST_EFFORT_H
-#include "ace/pre.h"
-
-#include "RMCast_Sender_Proxy.h"
-
-#if !defined (ACE_LACKS_PRAGMA_ONCE)
-# pragma once
-#endif /* ACE_LACKS_PRAGMA_ONCE */
-
-class ACE_RMCast_Module;
-
-class ACE_RMCast_Export ACE_RMCast_Sender_Proxy_Best_Effort : public ACE_RMCast_Sender_Proxy
-{
- // = TITLE
- // Reliable Multicast Sender Ambassador
- //
- // = DESCRIPTION
- // Implement an Ambassador for the reliable multicast senders.
- //
-public:
- ACE_RMCast_Sender_Proxy_Best_Effort (ACE_RMCast_Module *module);
- // Constructor
-
- ~ACE_RMCast_Sender_Proxy_Best_Effort (void);
- // Destructor
-
- virtual int receive_message (char *buffer, size_t size);
- // A DATA message was received.
-};
-
-#if defined (__ACE_INLINE__)
-#include "RMCast_Sender_Proxy_Best_Effort.i"
-#endif /* __ACE_INLINE__ */
-
-#include "ace/post.h"
-#endif /* ACE_RMCAST_SENDER_PROXY_BEST_EFFORT_H */
diff --git a/protocols/ace/RMCast/RMCast_Sender_Proxy_Factory.cpp b/protocols/ace/RMCast/RMCast_Sender_Proxy_Factory.cpp
deleted file mode 100644
index 48a82b5dfbc..00000000000
--- a/protocols/ace/RMCast/RMCast_Sender_Proxy_Factory.cpp
+++ /dev/null
@@ -1,13 +0,0 @@
-// $Id$
-
-#include "RMCast_Sender_Proxy_Factory.h"
-
-#if !defined (__ACE_INLINE__)
-# include "RMCast_Sender_Proxy_Factory.i"
-#endif /* ! __ACE_INLINE__ */
-
-ACE_RCSID(ace, RMCast_Sender_Proxy_Factory, "$Id$")
-
-ACE_RMCast_Sender_Proxy_Factory::~ACE_RMCast_Sender_Proxy_Factory (void)
-{
-}
diff --git a/protocols/ace/RMCast/RMCast_Sender_Proxy_Factory.h b/protocols/ace/RMCast/RMCast_Sender_Proxy_Factory.h
deleted file mode 100644
index 7dff4d2796f..00000000000
--- a/protocols/ace/RMCast/RMCast_Sender_Proxy_Factory.h
+++ /dev/null
@@ -1,55 +0,0 @@
-/* -*- C++ -*- */
-// $Id$
-
-// ============================================================================
-//
-// = LIBRARY
-// ace
-//
-// = FILENAME
-// RMCast_Sender_Proxy_Factory.h
-//
-// = AUTHOR
-// Carlos O'Ryan <coryan@uci.edu>
-//
-// ============================================================================
-
-#ifndef ACE_RMCAST_SENDER_PROXY_FACTORY_H
-#define ACE_RMCAST_SENDER_PROXY_FACTORY_H
-#include "ace/pre.h"
-
-#include "RMCast.h"
-
-#if !defined (ACE_LACKS_PRAGMA_ONCE)
-# pragma once
-#endif /* ACE_LACKS_PRAGMA_ONCE */
-
-class ACE_RMCast_Sender_Proxy;
-
-class ACE_RMCast_Export ACE_RMCast_Sender_Proxy_Factory
-{
- // = DESCRIPTION
- // Defines the interface to create Sender_Proxies.
- // The application provides a Sender_Proxy_Factory, this is used
- // by the receiver side to create a different proxy for each
- // remote sender. The application configures the proxy with the
- // correct modules to process incoming events and achieve the
- // desired level of reliability.
- //
-public:
- virtual ~ACE_RMCast_Sender_Proxy_Factory (void);
- // Destructor
-
- virtual ACE_RMCast_Sender_Proxy *create (void) = 0;
- // Create a new proxy
-
- virtual void destroy (ACE_RMCast_Sender_Proxy *) = 0;
- // Destroy a proxy
-};
-
-#if defined (__ACE_INLINE__)
-#include "RMCast_Sender_Proxy_Factory.i"
-#endif /* __ACE_INLINE__ */
-
-#include "ace/post.h"
-#endif /* ACE_RMCAST_SENDER_PROXY_FACTORY_H */
diff --git a/protocols/ace/RMCast/RMCast_Sender_Proxy_Factory.i b/protocols/ace/RMCast/RMCast_Sender_Proxy_Factory.i
deleted file mode 100644
index cfa1da318d3..00000000000
--- a/protocols/ace/RMCast/RMCast_Sender_Proxy_Factory.i
+++ /dev/null
@@ -1 +0,0 @@
-// $Id$
diff --git a/protocols/ace/RMCast/RMCast_UDP_Event_Handler.cpp b/protocols/ace/RMCast/RMCast_UDP_Event_Handler.cpp
index 69cfc337113..e5ff8da2761 100644
--- a/protocols/ace/RMCast/RMCast_UDP_Event_Handler.cpp
+++ b/protocols/ace/RMCast/RMCast_UDP_Event_Handler.cpp
@@ -3,7 +3,7 @@
//
#include "RMCast_UDP_Event_Handler.h"
-#include "RMCast_UDP_Receiver.h"
+#include "RMCast_IO_UDP.h"
#if !defined (__ACE_INLINE__)
# include "RMCast_UDP_Event_Handler.i"
@@ -18,19 +18,19 @@ ACE_RMCast_UDP_Event_Handler::~ACE_RMCast_UDP_Event_Handler (void)
ACE_HANDLE
ACE_RMCast_UDP_Event_Handler::get_handle (void) const
{
- return this->receiver_->get_handle ();
+ return this->io_udp_->get_handle ();
}
int
ACE_RMCast_UDP_Event_Handler::handle_input (ACE_HANDLE h)
{
- return this->receiver_->handle_input (h);
+ return this->io_udp_->handle_input (h);
}
int
ACE_RMCast_UDP_Event_Handler::handle_timeout (const ACE_Time_Value &,
const void *)
{
- // @@ return this->receiver_->handle_timeout ();
+ // @@ return this->io_udp_->handle_timeout ();
return 0;
}
diff --git a/protocols/ace/RMCast/RMCast_UDP_Event_Handler.h b/protocols/ace/RMCast/RMCast_UDP_Event_Handler.h
index 193d7038cd8..02798cee7f8 100644
--- a/protocols/ace/RMCast/RMCast_UDP_Event_Handler.h
+++ b/protocols/ace/RMCast/RMCast_UDP_Event_Handler.h
@@ -4,7 +4,7 @@
//
// = DESCRIPTION
// Implement an adapter between the ACE Reactor and the
-// ACE_RMCast_UDP_Receiver
+// ACE_RMCast_IO_UDP
//
// = AUTHOR
// Carlos O'Ryan <coryan@uci.edu>
@@ -22,13 +22,13 @@
# pragma once
#endif /* ACE_LACKS_PRAGMA_ONCE */
-class ACE_RMCast_UDP_Receiver;
+class ACE_RMCast_IO_UDP;
class ACE_INET_Addr;
class ACE_RMCast_Export ACE_RMCast_UDP_Event_Handler : public ACE_Event_Handler
{
public:
- ACE_RMCast_UDP_Event_Handler (ACE_RMCast_UDP_Receiver *receiver);
+ ACE_RMCast_UDP_Event_Handler (ACE_RMCast_IO_UDP *receiver);
// Constructor
~ACE_RMCast_UDP_Event_Handler (void);
@@ -41,7 +41,7 @@ public:
const void *act = 0);
private:
- ACE_RMCast_UDP_Receiver *receiver_;
+ ACE_RMCast_IO_UDP *io_udp_;
// The sender
};
diff --git a/protocols/ace/RMCast/RMCast_UDP_Event_Handler.i b/protocols/ace/RMCast/RMCast_UDP_Event_Handler.i
index b35aeefa3f4..99b4c0ac7e5 100644
--- a/protocols/ace/RMCast/RMCast_UDP_Event_Handler.i
+++ b/protocols/ace/RMCast/RMCast_UDP_Event_Handler.i
@@ -2,8 +2,7 @@
ACE_INLINE
ACE_RMCast_UDP_Event_Handler::
-ACE_RMCast_UDP_Event_Handler (ACE_RMCast_UDP_Receiver *receiver)
- : receiver_ (receiver)
+ACE_RMCast_UDP_Event_Handler (ACE_RMCast_IO_UDP *io)
+ : io_udp_ (io)
{
}
-
diff --git a/protocols/ace/RMCast/RMCast_UDP_Proxy.cpp b/protocols/ace/RMCast/RMCast_UDP_Proxy.cpp
new file mode 100644
index 00000000000..1fbad27f2cd
--- /dev/null
+++ b/protocols/ace/RMCast/RMCast_UDP_Proxy.cpp
@@ -0,0 +1,136 @@
+// $Id$
+
+#include "RMCast_UDP_Proxy.h"
+#include "RMCast_Module.h"
+#include "ace/Message_Block.h"
+
+#if !defined (__ACE_INLINE__)
+# include "RMCast_UDP_Proxy.i"
+#endif /* ! __ACE_INLINE__ */
+
+ACE_RCSID(ace, RMCast_UDP_Proxy, "$Id$")
+
+ACE_RMCast_UDP_Proxy::ACE_RMCast_UDP_Proxy (ACE_RMCast_IO_UDP *io_udp,
+ const ACE_INET_Addr &addr,
+ ACE_RMCast_Module *module)
+ : io_udp_ (io_udp)
+ , peer_addr_ (addr)
+ , module_ (module)
+{
+}
+
+ACE_RMCast_UDP_Proxy::~ACE_RMCast_UDP_Proxy (void)
+{
+}
+
+int
+ACE_RMCast_UDP_Proxy::receive_message (char *buffer, size_t size)
+{
+ int type = buffer[0];
+
+ // @@ What should we do with invalid messages like this?
+ //
+ if (type < 0 || type >= ACE_RMCast::MT_LAST)
+ return 0;
+
+ if (type == ACE_RMCast::MT_POLL)
+ {
+ ACE_RMCast::Poll poll;
+ return this->module ()->poll (poll);
+ }
+
+ else if (type == ACE_RMCast::MT_ACK_JOIN)
+ {
+ ACE_RMCast::Ack_Join ack_join;
+ const size_t header_size = 1 + sizeof(ACE_UINT32);
+ if (size < header_size)
+ {
+ // The message is too small
+ return 0;
+ }
+
+ ACE_UINT32 tmp;
+
+ ACE_OS::memcpy (&tmp, buffer + 1,
+ sizeof(tmp));
+ ack_join.next_sequence_number = ACE_NTOHL (tmp);
+ return this->module ()->ack_join (ack_join);
+ }
+
+ else if (type == ACE_RMCast::MT_ACK_LEAVE)
+ {
+ ACE_RMCast::Ack_Leave ack_leave;
+ return this->module ()->ack_leave (ack_leave);
+ }
+
+ else if (type == ACE_RMCast::MT_DATA)
+ {
+ ACE_RMCast::Data data;
+ const size_t header_size = 1 + 3 * sizeof(ACE_UINT32);
+ if (size < header_size)
+ {
+ // The message is too small
+ return 0;
+ }
+
+ ACE_UINT32 tmp;
+
+ ACE_OS::memcpy (&tmp, buffer + 1,
+ sizeof(tmp));
+ data.sequence_number = ACE_NTOHL (tmp);
+
+ ACE_OS::memcpy (&tmp, buffer + 1 + sizeof(tmp),
+ sizeof(tmp));
+ data.total_size = ACE_NTOHL (tmp);
+
+ ACE_OS::memcpy (&tmp, buffer + 1 + 2 * sizeof(tmp),
+ sizeof(tmp));
+ data.fragment_offset = ACE_NTOHL (tmp);
+
+ // Pass it up the module...
+ ACE_Message_Block *mb;
+ ACE_NEW_RETURN (mb, ACE_Message_Block, -1);
+ mb->size (size - header_size);
+ mb->copy (buffer + header_size, size - header_size);
+
+ data.payload = mb;
+ return this->module ()->data (data);
+ }
+
+ else if (type == ACE_RMCast::MT_JOIN)
+ {
+ ACE_RMCast::Join join;
+ return this->module ()->join (join);
+ }
+
+ else if (type == ACE_RMCast::MT_LEAVE)
+ {
+ ACE_RMCast::Leave leave;
+ return this->module ()->leave (leave);
+ }
+
+ else if (type == ACE_RMCast::MT_ACK)
+ {
+ ACE_RMCast::Ack ack;
+
+ const size_t header_size = 1 + sizeof(ACE_UINT32);
+ if (size < header_size)
+ {
+ // The message is too small
+ return 0;
+ }
+
+ ACE_UINT32 tmp;
+
+ ACE_OS::memcpy (&tmp, buffer + 1,
+ sizeof(tmp));
+ ack.highest_in_sequence = ACE_NTOHL (tmp);
+ ACE_OS::memcpy (&tmp, buffer + 1 + sizeof(ACE_UINT32),
+ sizeof(tmp));
+ ack.highest_received = ACE_NTOHL (tmp);
+
+ return this->module ()->ack (ack);
+ }
+
+ return 0;
+}
diff --git a/protocols/ace/RMCast/RMCast_Sender_Proxy.h b/protocols/ace/RMCast/RMCast_UDP_Proxy.h
index c6b51f78b48..aa7e08c65be 100644
--- a/protocols/ace/RMCast/RMCast_Sender_Proxy.h
+++ b/protocols/ace/RMCast/RMCast_UDP_Proxy.h
@@ -7,26 +7,28 @@
// ace
//
// = FILENAME
-// RMCast_Sender_Proxy.h
+// RMCast_UDP_Proxy.h
//
// = AUTHOR
// Carlos O'Ryan <coryan@uci.edu>
//
// ============================================================================
-#ifndef ACE_RMCAST_SENDER_PROXY_H
-#define ACE_RMCAST_SENDER_PROXY_H
+#ifndef ACE_RMCAST_UDP_PROXY_H
+#define ACE_RMCAST_UDP_PROXY_H
#include "ace/pre.h"
#include "RMCast.h"
+#include "ace/INET_Addr.h"
#if !defined (ACE_LACKS_PRAGMA_ONCE)
# pragma once
#endif /* ACE_LACKS_PRAGMA_ONCE */
class ACE_RMCast_Module;
+class ACE_RMCast_IO_UDP;
-class ACE_RMCast_Export ACE_RMCast_Sender_Proxy
+class ACE_RMCast_Export ACE_RMCast_UDP_Proxy
{
// = TITLE
// Reliable Multicast Sender Ambassador
@@ -35,27 +37,37 @@ class ACE_RMCast_Export ACE_RMCast_Sender_Proxy
// Implement an Ambassador for the reliable multicast senders.
//
public:
- ACE_RMCast_Sender_Proxy (ACE_RMCast_Module *module);
+ ACE_RMCast_UDP_Proxy (ACE_RMCast_IO_UDP *io_udp,
+ const ACE_INET_Addr &peer_addr,
+ ACE_RMCast_Module *module);
// Constructor
- virtual ~ACE_RMCast_Sender_Proxy (void);
+ virtual ~ACE_RMCast_UDP_Proxy (void);
// Destructor
- ACE_RMCast_Module *module (void) const;
- // Return the internal module
+ int receive_message (char *buffer, size_t size);
+ // Receive the message
+
+ const ACE_INET_Addr &peer_addr (void) const;
+ // The address of the peer
- virtual int receive_message (char *buffer, size_t size) = 0;
- // A new message has been received, process it
+ ACE_RMCast_Module *module (void) const;
+ // The module
private:
+ ACE_RMCast_IO_UDP *io_udp_;
+ // The IO facade
+
+ ACE_INET_Addr peer_addr_;
+ // The address of the peer
+
ACE_RMCast_Module *module_;
- // Process the data, control messages are processed by the Sender
- // proxy
+ // Process the data and control messages.
};
#if defined (__ACE_INLINE__)
-#include "RMCast_Sender_Proxy.i"
+#include "RMCast_UDP_Proxy.i"
#endif /* __ACE_INLINE__ */
#include "ace/post.h"
-#endif /* ACE_RMCAST_SENDER_PROXY_H */
+#endif /* ACE_RMCAST_UDP_PROXY_H */
diff --git a/protocols/ace/RMCast/RMCast_UDP_Proxy.i b/protocols/ace/RMCast/RMCast_UDP_Proxy.i
new file mode 100644
index 00000000000..8ef6142ed7c
--- /dev/null
+++ b/protocols/ace/RMCast/RMCast_UDP_Proxy.i
@@ -0,0 +1,13 @@
+// $Id$
+
+ACE_INLINE const ACE_INET_Addr&
+ACE_RMCast_UDP_Proxy::peer_addr (void) const
+{
+ return this->peer_addr_;
+}
+
+ACE_INLINE ACE_RMCast_Module *
+ACE_RMCast_UDP_Proxy::module (void) const
+{
+ return this->module_;
+}
diff --git a/protocols/ace/RMCast/RMCast_UDP_Receiver.cpp b/protocols/ace/RMCast/RMCast_UDP_Receiver.cpp
deleted file mode 100644
index eeb03f50bcf..00000000000
--- a/protocols/ace/RMCast/RMCast_UDP_Receiver.cpp
+++ /dev/null
@@ -1,241 +0,0 @@
-//
-// $Id$
-//
-
-#include "RMCast_UDP_Receiver.h"
-#include "RMCast_Sender_Proxy.h"
-#include "RMCast_Sender_Proxy_Factory.h"
-#include "ace/Handle_Set.h"
-#include "ace/Reactor.h"
-
-#if !defined (__ACE_INLINE__)
-# include "RMCast_UDP_Receiver.i"
-#endif /* ! __ACE_INLINE__ */
-
-ACE_RCSID(ace, RMCast_Receiver, "$Id$")
-
-ACE_RMCast_UDP_Receiver::~ACE_RMCast_UDP_Receiver (void)
-{
-}
-
-int
-ACE_RMCast_UDP_Receiver::subscribe (const ACE_INET_Addr &mcast_addr,
- int reuse_addr,
- const ACE_TCHAR *net_if,
- int protocol_family,
- int protocol)
-{
- return this->dgram_.subscribe (mcast_addr,
- reuse_addr,
- net_if,
- protocol_family,
- protocol);
-}
-
-int
-ACE_RMCast_UDP_Receiver::handle_events (ACE_Time_Value *tv)
-{
- ACE_HANDLE h = this->dgram_.get_handle ();
- if (h == ACE_INVALID_HANDLE)
- return -1;
-
- ACE_Handle_Set handle_set;
- handle_set.set_bit (h);
-
- ACE_Countdown_Time countdown (tv);
-
- int r = ACE_OS::select (int(h) + 1,
- handle_set, 0, 0,
- tv);
- if (r == -1)
- {
- if (errno == EINTR)
- return 0;
- else
- return -1;
- }
- else if (r == 0)
- {
- return 0;
- }
-
- return this->handle_input (h);
-}
-
-int
-ACE_RMCast_UDP_Receiver::register_handlers (ACE_Reactor *reactor)
-{
- this->eh_.reactor (reactor);
- return reactor->register_handler (&this->eh_,
- ACE_Event_Handler::READ_MASK);
-}
-
-int
-ACE_RMCast_UDP_Receiver::remove_handlers (void)
-{
- ACE_Reactor *r = this->eh_.reactor ();
- if (r != 0)
- {
- r->remove_handler (&this->eh_,
- ACE_Event_Handler::ALL_EVENTS_MASK
- | ACE_Event_Handler::DONT_CALL);
- this->eh_.reactor (0);
- }
- return 0;
-}
-
-int
-ACE_RMCast_UDP_Receiver::handle_input (ACE_HANDLE)
-{
- // @@ We should use a system constant instead of this literal
- const int max_udp_packet_size = 65536;
- char buffer[max_udp_packet_size];
-
- ACE_INET_Addr from_address;
- ssize_t r =
- this->dgram_.recv (buffer, sizeof(buffer), from_address);
-
- if (r == -1)
- {
- // @@ LOG??
- ACE_DEBUG ((LM_DEBUG,
- "RMCast_UDP_Receiver::handle_input () - "
- "error in recv\n"));
- return -1;
- }
-
- // ACE_HEX_DUMP ((LM_DEBUG, buffer, 16, "Receiver::handle_input"));
-
- // @@ Locking!
-
- int type = buffer[0];
-
- ACE_RMCast_Sender_Proxy *sender_proxy;
- if (this->map_.find (from_address, sender_proxy) != 0)
- {
- // State == RS_NON_EXISTENT
-
- if (type == ACE_RMCast::MT_ACK
- || type == ACE_RMCast::MT_JOIN
- || type == ACE_RMCast::MT_LEAVE
- || type == ACE_RMCast::MT_ACK_LEAVE)
- {
- // All these message types indicate a problem, the should be
- // generated by receivers, not received by them.
- return 0;
- }
-
- // The message type is valid, we must create a new proxy,
- // initially in the JOINING state...
- sender_proxy =
- this->factory_->create ();
- if (sender_proxy == 0)
- {
- // @@ LOG??
- return 0;
- }
- if (this->map_.bind (from_address, sender_proxy) != 0)
- {
- // @@ LOG??
- return 0;
- }
-
- // Send back a JOIN message...
- return sender_proxy->receive_message (buffer, r);
- }
-
- if (type == ACE_RMCast::MT_ACK
- || type == ACE_RMCast::MT_JOIN
- || type == ACE_RMCast::MT_LEAVE
- || type == ACE_RMCast::MT_ACK_LEAVE
- || type < 0
- || type >= ACE_RMCast::MT_LAST)
- {
- // In this case the message is invalid, but the proxy is already
- // in the table, must destroy it because there was a violation
- // in the protocol....
-
- this->factory_->destroy (sender_proxy);
- this->map_.unbind (from_address);
- return 0;
- }
-
- return sender_proxy->receive_message (buffer, r);
-}
-
-ACE_HANDLE
-ACE_RMCast_UDP_Receiver::get_handle (void) const
-{
- return this->dgram_.get_handle ();
-}
-
-#if 0
-int
-ACE_RMCast_UDP_Receiver::send_join (ACE_INET_Addr &from)
-{
- char buffer[16];
- buffer[0] = ACE_RMCast::MT_JOIN;
-
- ACE_SOCK_Dgram &dgram = this->dgram_;
- ssize_t r = dgram.send (buffer, 1, from);
-
- if (r == -1)
- return -1;
-
- return 0;
-}
-
-int
-ACE_RMCast_UDP_Receiver::send_ack (ACE_RMCast_Sender_Proxy *sender_proxy,
- ACE_INET_Addr &from)
-{
- char buffer[16];
- buffer[0] = ACE_RMCast::MT_ACK;
-
- ACE_UINT32 expected = sender_proxy->expected ();
- expected = ACE_HTONL (expected);
-
- ACE_UINT32 last_received = sender_proxy->last_received ();
- last_received = ACE_HTONL (last_received);
-
- ACE_OS::memcpy (buffer + 1, &expected, sizeof(expected));
- ACE_OS::memcpy (buffer + 1 + sizeof(expected), &last_received,
- sizeof(last_received));
-
- ACE_SOCK_Dgram &dgram = this->dgram_;
- ssize_t r = dgram.send (buffer,
- + sizeof(expected)
- + sizeof(last_received),
- from);
-
- if (r == -1)
- return -1;
-
- return 0;
-}
-
-int
-ACE_RMCast_UDP_Receiver::send_leave (ACE_INET_Addr &from)
-{
- char buffer[16];
- buffer[0] = ACE_RMCast::MT_LEAVE;
-
- ACE_SOCK_Dgram &dgram = this->dgram_;
- ssize_t r = dgram.send (buffer, 1, from);
-
- if (r == -1)
- return -1;
-
- return 0;
-}
-#endif /* 0 */
-
-#if defined (ACE_HAS_EXPLICIT_TEMPLATE_INSTANTIATION)
-
-template class ACE_Hash_Map_Manager<ACE_INET_Addr,ACE_RMCast_Sender_Proxy*,ACE_Null_Mutex>;
-template class ACE_Hash_Map_Manager_Ex<ACE_INET_Addr,ACE_RMCast_Sender_Proxy*,ACE_Hash<ACE_INET_Addr>,ACE_Equal_To<ACE_INET_Addr>,ACE_Null_Mutex>;
-template class ACE_Hash_Map_Iterator<ACE_INET_Addr,ACE_RMCast_Sender_Proxy*,ACE_Null_Mutex>;
-template class ACE_Hash_Map_Iterator_Base_Ex<ACE_INET_Addr,ACE_RMCast_Sender_Proxy*,ACE_Hash<ACE_INET_Addr>,ACE_Equal_To<ACE_INET_Addr>,ACE_Null_Mutex>;
-template class ACE_Hash_Map_Entry<ACE_INET_Addr,ACE_RMCast_Sender_Proxy*>;
-
-#endif /* ACE_HAS_EXPLICIT_TEMPLATE_INSTANTIATION */
diff --git a/protocols/ace/RMCast/RMCast_UDP_Receiver.i b/protocols/ace/RMCast/RMCast_UDP_Receiver.i
deleted file mode 100644
index 81aeb8e2752..00000000000
--- a/protocols/ace/RMCast/RMCast_UDP_Receiver.i
+++ /dev/null
@@ -1,9 +0,0 @@
-// $Id$
-
-ACE_INLINE
-ACE_RMCast_UDP_Receiver::
- ACE_RMCast_UDP_Receiver (ACE_RMCast_Sender_Proxy_Factory *factory)
- : factory_ (factory)
- , eh_ (this)
-{
-}
diff --git a/protocols/ace/RMCast/RMCast_UDP_Sender.cpp b/protocols/ace/RMCast/RMCast_UDP_Sender.cpp
deleted file mode 100644
index c02ad6fb9cf..00000000000
--- a/protocols/ace/RMCast/RMCast_UDP_Sender.cpp
+++ /dev/null
@@ -1,91 +0,0 @@
-//
-// $Id$
-//
-
-#include "RMCast_UDP_Sender.h"
-#include "RMCast.h"
-#include "ace/Message_Block.h"
-
-#if !defined (__ACE_INLINE__)
-# include "RMCast_UDP_Sender.i"
-#endif /* ! __ACE_INLINE__ */
-
-ACE_RCSID(ace, RMCast_UDP_Sender, "$Id$")
-
-ACE_RMCast_UDP_Sender::ACE_RMCast_UDP_Sender (const ACE_INET_Addr &mcast_addr)
- : ACE_RMCast_Module ()
- , mcast_addr_ (mcast_addr)
-{
-}
-
-ACE_RMCast_UDP_Sender::~ACE_RMCast_UDP_Sender (void)
-{
-}
-
-int
-ACE_RMCast_UDP_Sender::open (void)
-{
- return this->dgram_.open (ACE_Addr::sap_any);
-}
-
-int
-ACE_RMCast_UDP_Sender::close (void)
-{
- return this->dgram_.close ();
-}
-
-int
-ACE_RMCast_UDP_Sender::put_data (ACE_RMCast::Data &data)
-{
- // The first message block contains the header
- // @@ TODO: We could keep the header pre-initialized, and only
- // update the portions that do change...
- ACE_UINT32 tmp;
- char header[1 + 3 * sizeof(ACE_UINT32)];
- header[0] = ACE_RMCast::MT_DATA;
-
- tmp = ACE_HTONL (data.sequence_number);
- ACE_OS::memcpy (header + 1,
- &tmp, sizeof(ACE_UINT32));
- tmp = ACE_HTONL (data.total_size);
- ACE_OS::memcpy (header + 1 + sizeof(ACE_UINT32),
- &tmp, sizeof(ACE_UINT32));
- tmp = ACE_HTONL (data.fragment_offset);
- ACE_OS::memcpy (header + 1 + 2 * sizeof(ACE_UINT32),
- &tmp, sizeof(ACE_UINT32));
-
- iovec iov[IOV_MAX];
- int iovcnt = 1;
-
- iov[0].iov_base = header;
- iov[0].iov_len = sizeof(header);
-
- ACE_Message_Block *mb = data.payload;
-
- for (const ACE_Message_Block *i = mb; i != 0; i = i->cont ())
- {
- iov[iovcnt].iov_base = i->rd_ptr ();
- iov[iovcnt].iov_len = i->length ();
- iovcnt++;
- if (iovcnt >= IOV_MAX)
- return -1;
- }
-
- ACE_Time_Value tv (0, 10000);
- ACE_OS::sleep (tv);
- if (this->dgram_.send (iov, iovcnt,
- this->mcast_addr_) == -1)
- return -1;
-
-#if 0
- ACE_HEX_DUMP ((LM_DEBUG,
- (char*)iov[0].iov_base, iov[0].iov_len, "Sending"));
-#endif
-
- return 0;
-}
-
-
-#if defined (ACE_HAS_EXPLICIT_TEMPLATE_INSTANTIATION)
-
-#endif /* ACE_HAS_EXPLICIT_TEMPLATE_INSTANTIATION */
diff --git a/protocols/ace/RMCast/RMCast_UDP_Sender.h b/protocols/ace/RMCast/RMCast_UDP_Sender.h
deleted file mode 100644
index 474ebbc7f27..00000000000
--- a/protocols/ace/RMCast/RMCast_UDP_Sender.h
+++ /dev/null
@@ -1,70 +0,0 @@
-/* -*- C++ -*- */
-// $Id$
-
-// ============================================================================
-//
-// = LIBRARY
-// ace
-//
-// = FILENAME
-// RMCast_UDP_Sender.h
-//
-// = AUTHOR
-// Carlos O'Ryan <coryan@uci.edu>
-//
-// ============================================================================
-
-#ifndef ACE_RMCAST_UDP_SENDER_H
-#define ACE_RMCAST_UDP_SENDER_H
-#include "ace/pre.h"
-
-#include "RMCast_Module.h"
-#include "ace/SOCK_Dgram.h"
-
-#if !defined (ACE_LACKS_PRAGMA_ONCE)
-# pragma once
-#endif /* ACE_LACKS_PRAGMA_ONCE */
-
-class ACE_RMCast_Export ACE_RMCast_UDP_Sender : public ACE_RMCast_Module
-{
- // = TITLE
- // Reliable Multicast UDP_Sender
- //
- // = DESCRIPTION
- // Implements a Facade to the classes that implement a reliable
- // multicast protocol.
-public:
- // = Initialization and termination methods.
- ACE_RMCast_UDP_Sender (const ACE_INET_Addr &mcast_addr);
- // Constructor
-
- virtual ~ACE_RMCast_UDP_Sender (void);
- // Destructor
-
- // = The RMCast_Module methods
- virtual int open (void);
- virtual int close (void);
- virtual int put_data (ACE_RMCast::Data &data);
- // Send the Message block, this is the callback invoked at the end
- // of the stack.
-
-protected:
- ACE_SOCK_Dgram dgram_;
- // This is the socket used to send the multicast data.
- // @@ This should be strategized, what if we want to use something
- // like ATM networks to send the data, then the types would be
- // different....
-
- ACE_INET_Addr mcast_addr_;
- // The multicast group we send to.
- // @@ Can we really strategize the addressing, without introducing
- // too much complexity? How can we decouple the reliability aspect
- // from the transport aspects of the system???
-};
-
-#if defined (__ACE_INLINE__)
-#include "RMCast_UDP_Sender.i"
-#endif /* __ACE_INLINE__ */
-
-#include "ace/post.h"
-#endif /* ACE_RMCAST_UDP_SENDER_H */
diff --git a/protocols/ace/RMCast/RMCast_UDP_Sender.i b/protocols/ace/RMCast/RMCast_UDP_Sender.i
deleted file mode 100644
index cfa1da318d3..00000000000
--- a/protocols/ace/RMCast/RMCast_UDP_Sender.i
+++ /dev/null
@@ -1 +0,0 @@
-// $Id$