summaryrefslogtreecommitdiff
path: root/protocols/ace/RMCast/RMCast_IO_UDP.cpp
diff options
context:
space:
mode:
Diffstat (limited to 'protocols/ace/RMCast/RMCast_IO_UDP.cpp')
-rw-r--r--protocols/ace/RMCast/RMCast_IO_UDP.cpp418
1 files changed, 418 insertions, 0 deletions
diff --git a/protocols/ace/RMCast/RMCast_IO_UDP.cpp b/protocols/ace/RMCast/RMCast_IO_UDP.cpp
new file mode 100644
index 00000000000..89cc7ae3c3a
--- /dev/null
+++ b/protocols/ace/RMCast/RMCast_IO_UDP.cpp
@@ -0,0 +1,418 @@
+//
+// $Id$
+//
+
+#include "RMCast_IO_UDP.h"
+#include "RMCast_UDP_Proxy.h"
+#include "RMCast_Module_Factory.h"
+#include "ace/Handle_Set.h"
+#include "ace/Reactor.h"
+#include "ace/Message_Block.h"
+
+#if !defined (__ACE_INLINE__)
+# include "RMCast_IO_UDP.i"
+#endif /* ! __ACE_INLINE__ */
+
+ACE_RCSID(ace, RMCast_IO_UDP, "$Id$")
+
+ACE_RMCast_IO_UDP::~ACE_RMCast_IO_UDP (void)
+{
+}
+
+int
+ACE_RMCast_IO_UDP::subscribe (const ACE_INET_Addr &mcast_addr,
+ int reuse_addr,
+ const ACE_TCHAR *net_if,
+ int protocol_family,
+ int protocol)
+{
+ this->mcast_group_ = mcast_addr;
+ return this->dgram_.subscribe (mcast_addr,
+ reuse_addr,
+ net_if,
+ protocol_family,
+ protocol);
+}
+
+int
+ACE_RMCast_IO_UDP::handle_events (ACE_Time_Value *tv)
+{
+ ACE_HANDLE h = this->dgram_.get_handle ();
+ if (h == ACE_INVALID_HANDLE)
+ return -1;
+
+ ACE_Handle_Set handle_set;
+ handle_set.set_bit (h);
+
+ ACE_Countdown_Time countdown (tv);
+
+ int r = ACE_OS::select (int(h) + 1,
+ handle_set, 0, 0,
+ tv);
+ if (r == -1)
+ {
+ if (errno == EINTR)
+ return 0;
+ else
+ return -1;
+ }
+ else if (r == 0)
+ {
+ return 0;
+ }
+
+ return this->handle_input (h);
+}
+
+int
+ACE_RMCast_IO_UDP::register_handlers (ACE_Reactor *reactor)
+{
+ this->eh_.reactor (reactor);
+ return reactor->register_handler (&this->eh_,
+ ACE_Event_Handler::READ_MASK);
+}
+
+int
+ACE_RMCast_IO_UDP::remove_handlers (void)
+{
+ ACE_Reactor *r = this->eh_.reactor ();
+ if (r != 0)
+ {
+ r->remove_handler (&this->eh_,
+ ACE_Event_Handler::ALL_EVENTS_MASK
+ | ACE_Event_Handler::DONT_CALL);
+ this->eh_.reactor (0);
+ }
+ return 0;
+}
+
+int
+ACE_RMCast_IO_UDP::handle_input (ACE_HANDLE)
+{
+ // @@ We should use a system constant instead of this literal
+ const int max_udp_packet_size = 65536;
+ char buffer[max_udp_packet_size];
+
+ ACE_INET_Addr from_address;
+ ssize_t r =
+ this->dgram_.recv (buffer, sizeof(buffer), from_address);
+
+ if (r == -1)
+ {
+ // @@ LOG??
+ ACE_DEBUG ((LM_DEBUG,
+ "RMCast_IO_UDP::handle_input () - "
+ "error in recv\n"));
+ return -1;
+ }
+
+ // ACE_HEX_DUMP ((LM_DEBUG, buffer, 16, "Receiver::handle_input"));
+
+ // @@ Locking!
+
+ int type = buffer[0];
+
+ if (type < 0 || type >= ACE_RMCast::MT_LAST)
+ {
+ // @@ Log: invalid message type!!
+ // @@ TODO: should we return -1? The socket is still valid, it
+ // makes little sense to destroy it just because one remote
+ // sender is sending invalid messages. Maybe we should
+ // strategize this too, and report the problem to the
+ // application, this could indicate a misconfiguration or
+ // something worse...
+
+ // In any case the proxy should be destroyed, its peer is making
+ // something really wrong.
+ ACE_RMCast_UDP_Proxy *proxy;
+ if (this->map_.unbind (from_address, proxy) == 0)
+ {
+ this->factory_->destroy (proxy->module ());
+ delete proxy;
+ }
+ return 0;
+ }
+
+ ACE_RMCast_UDP_Proxy *proxy;
+ if (this->map_.find (from_address, proxy) != 0)
+ {
+ // State == RS_NON_EXISTENT
+
+ // @@ We should validate the message *before* creating the
+ // object, all we need is some sort of validation strategy, a
+ // different one for the receiver and another one for the
+ // sender.
+#if 0
+ if (type == ACE_RMCast::MT_ACK
+ || type == ACE_RMCast::MT_JOIN
+ || type == ACE_RMCast::MT_LEAVE
+ || type == ACE_RMCast::MT_ACK_LEAVE)
+ {
+ // All these message types indicate a problem, the should be
+ // generated by receivers, not received by them.
+ return 0;
+ }
+#endif /* 0 */
+
+ // The message type is valid, we must create a new proxy,
+ // initially in the JOINING state...
+ ACE_RMCast_Module *module = this->factory_->create (this);
+ if (module == 0)
+ {
+ // @@ LOG??
+ // Try to continue working, maybe the module can be created
+ // later.
+ return 0;
+ }
+ ACE_NEW_RETURN (proxy,
+ ACE_RMCast_UDP_Proxy(this,
+ from_address,
+ module),
+ 0);
+
+ if (this->map_.bind (from_address, proxy) != 0)
+ {
+ // @@ LOG??
+ return 0;
+ }
+
+ }
+
+ // Have the proxy process the message and do the right thing.
+ return proxy->receive_message (buffer, r);
+}
+
+ACE_HANDLE
+ACE_RMCast_IO_UDP::get_handle (void) const
+{
+ return this->dgram_.get_handle ();
+}
+
+int
+ACE_RMCast_IO_UDP::data (ACE_RMCast::Data &data)
+{
+ return this->send_data (data, this->mcast_group_);
+}
+
+int
+ACE_RMCast_IO_UDP::poll (ACE_RMCast::Poll &poll)
+{
+ return this->send_poll (poll, this->mcast_group_);
+}
+
+int
+ACE_RMCast_IO_UDP::ack_join (ACE_RMCast::Ack_Join &ack_join)
+{
+ return this->send_ack_join (ack_join, this->mcast_group_);
+}
+
+int
+ACE_RMCast_IO_UDP::ack_leave (ACE_RMCast::Ack_Leave &ack_leave)
+{
+ return this->send_ack_leave (ack_leave, this->mcast_group_);
+}
+
+int
+ACE_RMCast_IO_UDP::ack (ACE_RMCast::Ack &ack)
+{
+ return this->send_ack (ack, this->mcast_group_);
+}
+
+int
+ACE_RMCast_IO_UDP::join (ACE_RMCast::Join &join)
+{
+ return this->send_join (join, this->mcast_group_);
+}
+
+int
+ACE_RMCast_IO_UDP::leave (ACE_RMCast::Leave &leave)
+{
+ return this->send_leave (leave, this->mcast_group_);
+}
+
+int
+ACE_RMCast_IO_UDP::send_data (ACE_RMCast::Data &data,
+ const ACE_INET_Addr &to)
+{
+ // The first message block contains the header
+ // @@ TODO: We could keep the header pre-initialized, and only
+ // update the portions that do change...
+ ACE_UINT32 tmp;
+ char header[1 + 3 * sizeof(ACE_UINT32)];
+ header[0] = ACE_RMCast::MT_DATA;
+
+ tmp = ACE_HTONL (data.sequence_number);
+ ACE_OS::memcpy (header + 1,
+ &tmp, sizeof(ACE_UINT32));
+ tmp = ACE_HTONL (data.total_size);
+ ACE_OS::memcpy (header + 1 + sizeof(ACE_UINT32),
+ &tmp, sizeof(ACE_UINT32));
+ tmp = ACE_HTONL (data.fragment_offset);
+ ACE_OS::memcpy (header + 1 + 2 * sizeof(ACE_UINT32),
+ &tmp, sizeof(ACE_UINT32));
+
+ iovec iov[IOV_MAX];
+ int iovcnt = 1;
+
+ iov[0].iov_base = header;
+ iov[0].iov_len = sizeof(header);
+
+ ACE_Message_Block *mb = data.payload;
+
+ for (const ACE_Message_Block *i = mb; i != 0; i = i->cont ())
+ {
+ iov[iovcnt].iov_base = i->rd_ptr ();
+ iov[iovcnt].iov_len = i->length ();
+ iovcnt++;
+ if (iovcnt >= IOV_MAX)
+ return -1;
+ }
+
+ // @@ This pacing stuff here reduced the number of packet lost in
+ // loopback tests, but it should be taken out for real applications
+ // (or at least made configurable!)
+ ACE_Time_Value tv (0, 10000);
+ ACE_OS::sleep (tv);
+
+ // ACE_SOCK_MCast_Dgram disallows sending, but it actually works.
+ ACE_SOCK_Dgram &dgram = this->dgram_;
+
+ if (dgram.send (iov, iovcnt, to) == -1)
+ return -1;
+
+#if 0
+ ACE_HEX_DUMP ((LM_DEBUG,
+ (char*)iov[0].iov_base, iov[0].iov_len, "Sending"));
+#endif
+
+ return 0;
+}
+
+int
+ACE_RMCast_IO_UDP::send_poll (ACE_RMCast::Poll &poll,
+ const ACE_INET_Addr &to)
+{
+ // @@ TODO: We could keep the header pre-initialized, and only
+ // update the portions that do change...
+ char header[16];
+ header[0] = ACE_RMCast::MT_POLL;
+
+ // ACE_SOCK_MCast_Dgram disallows sending, but it actually works.
+ ACE_SOCK_Dgram &dgram = this->dgram_;
+
+ if (dgram.send (header, 1, to) == -1)
+ return -1;
+
+ return 0;
+}
+
+int
+ACE_RMCast_IO_UDP::send_ack_join (ACE_RMCast::Ack_Join &ack_join,
+ const ACE_INET_Addr &to)
+{
+ // @@ TODO: We could keep the header pre-initialized, and only
+ // update the portions that do change...
+ char header[16];
+ header[0] = ACE_RMCast::MT_ACK_JOIN;
+
+ ACE_UINT32 tmp = ACE_HTONL (ack_join.next_sequence_number);
+ ACE_OS::memcpy (header + 1,
+ &tmp, sizeof(ACE_UINT32));
+ // ACE_SOCK_MCast_Dgram disallows sending, but it actually works.
+ ACE_SOCK_Dgram &dgram = this->dgram_;
+
+ if (dgram.send (header, 1 + sizeof(ACE_UINT32), to) == -1)
+ return -1;
+
+ return 0;
+}
+
+int
+ACE_RMCast_IO_UDP::send_ack_leave (ACE_RMCast::Ack_Leave &ack_leave,
+ const ACE_INET_Addr &to)
+{
+ // @@ TODO: We could keep the header pre-initialized, and only
+ // update the portions that do change...
+ char header[16];
+ header[0] = ACE_RMCast::MT_ACK_LEAVE;
+
+ // ACE_SOCK_MCast_Dgram disallows sending, but it actually works.
+ ACE_SOCK_Dgram &dgram = this->dgram_;
+
+ if (dgram.send (header, 1, to) == -1)
+ return -1;
+
+ return 0;
+}
+
+int
+ACE_RMCast_IO_UDP::send_ack (ACE_RMCast::Ack &ack,
+ const ACE_INET_Addr &to)
+{
+ // @@ TODO: We could keep the header pre-initialized, and only
+ // update the portions that do change...
+ char header[16];
+ header[0] = ACE_RMCast::MT_ACK;
+
+ ACE_UINT32 tmp = ACE_HTONL (ack.highest_in_sequence);
+ ACE_OS::memcpy (header + 1,
+ &tmp, sizeof(ACE_UINT32));
+ tmp = ACE_HTONL (ack.highest_received);
+ ACE_OS::memcpy (header + 1 + sizeof(ACE_UINT32),
+ &tmp, sizeof(ACE_UINT32));
+
+ // ACE_SOCK_MCast_Dgram disallows sending, but it actually works.
+ ACE_SOCK_Dgram &dgram = this->dgram_;
+
+ if (dgram.send (header, 1 + 2*sizeof(ACE_UINT32), to) == -1)
+ return -1;
+
+ return 0;
+}
+
+int
+ACE_RMCast_IO_UDP::send_join (ACE_RMCast::Join &join,
+ const ACE_INET_Addr &to)
+{
+ // @@ TODO: We could keep the header pre-initialized, and only
+ // update the portions that do change...
+ char header[16];
+ header[0] = ACE_RMCast::MT_JOIN;
+
+ // ACE_SOCK_MCast_Dgram disallows sending, but it actually works.
+ ACE_SOCK_Dgram &dgram = this->dgram_;
+
+ if (dgram.send (header, 1, to) == -1)
+ return -1;
+
+ return 0;
+}
+
+int
+ACE_RMCast_IO_UDP::send_leave (ACE_RMCast::Leave &,
+ const ACE_INET_Addr &to)
+{
+ // @@ TODO: We could keep the header pre-initialized, and only
+ // update the portions that do change...
+ char header[16];
+ header[0] = ACE_RMCast::MT_LEAVE;
+
+ // ACE_SOCK_MCast_Dgram disallows sending, but it actually works.
+ ACE_SOCK_Dgram &dgram = this->dgram_;
+
+ if (dgram.send (header, 1, to) == -1)
+ return -1;
+
+ return 0;
+}
+
+
+#if defined (ACE_HAS_EXPLICIT_TEMPLATE_INSTANTIATION)
+
+template class ACE_Hash_Map_Manager<ACE_INET_Addr,ACE_RMCast_UDP_Proxy*,ACE_Null_Mutex>;
+template class ACE_Hash_Map_Manager_Ex<ACE_INET_Addr,ACE_RMCast_UDP_Proxy*,ACE_Hash<ACE_INET_Addr>,ACE_Equal_To<ACE_INET_Addr>,ACE_Null_Mutex>;
+template class ACE_Hash_Map_Iterator<ACE_INET_Addr,ACE_RMCast_UDP_Proxy*,ACE_Null_Mutex>;
+template class ACE_Hash_Map_Iterator_Base_Ex<ACE_INET_Addr,ACE_RMCast_UDP_Proxy*,ACE_Hash<ACE_INET_Addr>,ACE_Equal_To<ACE_INET_Addr>,ACE_Null_Mutex>;
+template class ACE_Hash_Map_Entry<ACE_INET_Addr,ACE_RMCast_UDP_Proxy*>;
+
+#endif /* ACE_HAS_EXPLICIT_TEMPLATE_INSTANTIATION */