diff options
Diffstat (limited to 'protocols/ace/RMCast/RMCast_IO_UDP.cpp')
-rw-r--r-- | protocols/ace/RMCast/RMCast_IO_UDP.cpp | 418 |
1 files changed, 418 insertions, 0 deletions
diff --git a/protocols/ace/RMCast/RMCast_IO_UDP.cpp b/protocols/ace/RMCast/RMCast_IO_UDP.cpp new file mode 100644 index 00000000000..89cc7ae3c3a --- /dev/null +++ b/protocols/ace/RMCast/RMCast_IO_UDP.cpp @@ -0,0 +1,418 @@ +// +// $Id$ +// + +#include "RMCast_IO_UDP.h" +#include "RMCast_UDP_Proxy.h" +#include "RMCast_Module_Factory.h" +#include "ace/Handle_Set.h" +#include "ace/Reactor.h" +#include "ace/Message_Block.h" + +#if !defined (__ACE_INLINE__) +# include "RMCast_IO_UDP.i" +#endif /* ! __ACE_INLINE__ */ + +ACE_RCSID(ace, RMCast_IO_UDP, "$Id$") + +ACE_RMCast_IO_UDP::~ACE_RMCast_IO_UDP (void) +{ +} + +int +ACE_RMCast_IO_UDP::subscribe (const ACE_INET_Addr &mcast_addr, + int reuse_addr, + const ACE_TCHAR *net_if, + int protocol_family, + int protocol) +{ + this->mcast_group_ = mcast_addr; + return this->dgram_.subscribe (mcast_addr, + reuse_addr, + net_if, + protocol_family, + protocol); +} + +int +ACE_RMCast_IO_UDP::handle_events (ACE_Time_Value *tv) +{ + ACE_HANDLE h = this->dgram_.get_handle (); + if (h == ACE_INVALID_HANDLE) + return -1; + + ACE_Handle_Set handle_set; + handle_set.set_bit (h); + + ACE_Countdown_Time countdown (tv); + + int r = ACE_OS::select (int(h) + 1, + handle_set, 0, 0, + tv); + if (r == -1) + { + if (errno == EINTR) + return 0; + else + return -1; + } + else if (r == 0) + { + return 0; + } + + return this->handle_input (h); +} + +int +ACE_RMCast_IO_UDP::register_handlers (ACE_Reactor *reactor) +{ + this->eh_.reactor (reactor); + return reactor->register_handler (&this->eh_, + ACE_Event_Handler::READ_MASK); +} + +int +ACE_RMCast_IO_UDP::remove_handlers (void) +{ + ACE_Reactor *r = this->eh_.reactor (); + if (r != 0) + { + r->remove_handler (&this->eh_, + ACE_Event_Handler::ALL_EVENTS_MASK + | ACE_Event_Handler::DONT_CALL); + this->eh_.reactor (0); + } + return 0; +} + +int +ACE_RMCast_IO_UDP::handle_input (ACE_HANDLE) +{ + // @@ We should use a system constant instead of this literal + const int max_udp_packet_size = 65536; + char buffer[max_udp_packet_size]; + + ACE_INET_Addr from_address; + ssize_t r = + this->dgram_.recv (buffer, sizeof(buffer), from_address); + + if (r == -1) + { + // @@ LOG?? + ACE_DEBUG ((LM_DEBUG, + "RMCast_IO_UDP::handle_input () - " + "error in recv\n")); + return -1; + } + + // ACE_HEX_DUMP ((LM_DEBUG, buffer, 16, "Receiver::handle_input")); + + // @@ Locking! + + int type = buffer[0]; + + if (type < 0 || type >= ACE_RMCast::MT_LAST) + { + // @@ Log: invalid message type!! + // @@ TODO: should we return -1? The socket is still valid, it + // makes little sense to destroy it just because one remote + // sender is sending invalid messages. Maybe we should + // strategize this too, and report the problem to the + // application, this could indicate a misconfiguration or + // something worse... + + // In any case the proxy should be destroyed, its peer is making + // something really wrong. + ACE_RMCast_UDP_Proxy *proxy; + if (this->map_.unbind (from_address, proxy) == 0) + { + this->factory_->destroy (proxy->module ()); + delete proxy; + } + return 0; + } + + ACE_RMCast_UDP_Proxy *proxy; + if (this->map_.find (from_address, proxy) != 0) + { + // State == RS_NON_EXISTENT + + // @@ We should validate the message *before* creating the + // object, all we need is some sort of validation strategy, a + // different one for the receiver and another one for the + // sender. +#if 0 + if (type == ACE_RMCast::MT_ACK + || type == ACE_RMCast::MT_JOIN + || type == ACE_RMCast::MT_LEAVE + || type == ACE_RMCast::MT_ACK_LEAVE) + { + // All these message types indicate a problem, the should be + // generated by receivers, not received by them. + return 0; + } +#endif /* 0 */ + + // The message type is valid, we must create a new proxy, + // initially in the JOINING state... + ACE_RMCast_Module *module = this->factory_->create (this); + if (module == 0) + { + // @@ LOG?? + // Try to continue working, maybe the module can be created + // later. + return 0; + } + ACE_NEW_RETURN (proxy, + ACE_RMCast_UDP_Proxy(this, + from_address, + module), + 0); + + if (this->map_.bind (from_address, proxy) != 0) + { + // @@ LOG?? + return 0; + } + + } + + // Have the proxy process the message and do the right thing. + return proxy->receive_message (buffer, r); +} + +ACE_HANDLE +ACE_RMCast_IO_UDP::get_handle (void) const +{ + return this->dgram_.get_handle (); +} + +int +ACE_RMCast_IO_UDP::data (ACE_RMCast::Data &data) +{ + return this->send_data (data, this->mcast_group_); +} + +int +ACE_RMCast_IO_UDP::poll (ACE_RMCast::Poll &poll) +{ + return this->send_poll (poll, this->mcast_group_); +} + +int +ACE_RMCast_IO_UDP::ack_join (ACE_RMCast::Ack_Join &ack_join) +{ + return this->send_ack_join (ack_join, this->mcast_group_); +} + +int +ACE_RMCast_IO_UDP::ack_leave (ACE_RMCast::Ack_Leave &ack_leave) +{ + return this->send_ack_leave (ack_leave, this->mcast_group_); +} + +int +ACE_RMCast_IO_UDP::ack (ACE_RMCast::Ack &ack) +{ + return this->send_ack (ack, this->mcast_group_); +} + +int +ACE_RMCast_IO_UDP::join (ACE_RMCast::Join &join) +{ + return this->send_join (join, this->mcast_group_); +} + +int +ACE_RMCast_IO_UDP::leave (ACE_RMCast::Leave &leave) +{ + return this->send_leave (leave, this->mcast_group_); +} + +int +ACE_RMCast_IO_UDP::send_data (ACE_RMCast::Data &data, + const ACE_INET_Addr &to) +{ + // The first message block contains the header + // @@ TODO: We could keep the header pre-initialized, and only + // update the portions that do change... + ACE_UINT32 tmp; + char header[1 + 3 * sizeof(ACE_UINT32)]; + header[0] = ACE_RMCast::MT_DATA; + + tmp = ACE_HTONL (data.sequence_number); + ACE_OS::memcpy (header + 1, + &tmp, sizeof(ACE_UINT32)); + tmp = ACE_HTONL (data.total_size); + ACE_OS::memcpy (header + 1 + sizeof(ACE_UINT32), + &tmp, sizeof(ACE_UINT32)); + tmp = ACE_HTONL (data.fragment_offset); + ACE_OS::memcpy (header + 1 + 2 * sizeof(ACE_UINT32), + &tmp, sizeof(ACE_UINT32)); + + iovec iov[IOV_MAX]; + int iovcnt = 1; + + iov[0].iov_base = header; + iov[0].iov_len = sizeof(header); + + ACE_Message_Block *mb = data.payload; + + for (const ACE_Message_Block *i = mb; i != 0; i = i->cont ()) + { + iov[iovcnt].iov_base = i->rd_ptr (); + iov[iovcnt].iov_len = i->length (); + iovcnt++; + if (iovcnt >= IOV_MAX) + return -1; + } + + // @@ This pacing stuff here reduced the number of packet lost in + // loopback tests, but it should be taken out for real applications + // (or at least made configurable!) + ACE_Time_Value tv (0, 10000); + ACE_OS::sleep (tv); + + // ACE_SOCK_MCast_Dgram disallows sending, but it actually works. + ACE_SOCK_Dgram &dgram = this->dgram_; + + if (dgram.send (iov, iovcnt, to) == -1) + return -1; + +#if 0 + ACE_HEX_DUMP ((LM_DEBUG, + (char*)iov[0].iov_base, iov[0].iov_len, "Sending")); +#endif + + return 0; +} + +int +ACE_RMCast_IO_UDP::send_poll (ACE_RMCast::Poll &poll, + const ACE_INET_Addr &to) +{ + // @@ TODO: We could keep the header pre-initialized, and only + // update the portions that do change... + char header[16]; + header[0] = ACE_RMCast::MT_POLL; + + // ACE_SOCK_MCast_Dgram disallows sending, but it actually works. + ACE_SOCK_Dgram &dgram = this->dgram_; + + if (dgram.send (header, 1, to) == -1) + return -1; + + return 0; +} + +int +ACE_RMCast_IO_UDP::send_ack_join (ACE_RMCast::Ack_Join &ack_join, + const ACE_INET_Addr &to) +{ + // @@ TODO: We could keep the header pre-initialized, and only + // update the portions that do change... + char header[16]; + header[0] = ACE_RMCast::MT_ACK_JOIN; + + ACE_UINT32 tmp = ACE_HTONL (ack_join.next_sequence_number); + ACE_OS::memcpy (header + 1, + &tmp, sizeof(ACE_UINT32)); + // ACE_SOCK_MCast_Dgram disallows sending, but it actually works. + ACE_SOCK_Dgram &dgram = this->dgram_; + + if (dgram.send (header, 1 + sizeof(ACE_UINT32), to) == -1) + return -1; + + return 0; +} + +int +ACE_RMCast_IO_UDP::send_ack_leave (ACE_RMCast::Ack_Leave &ack_leave, + const ACE_INET_Addr &to) +{ + // @@ TODO: We could keep the header pre-initialized, and only + // update the portions that do change... + char header[16]; + header[0] = ACE_RMCast::MT_ACK_LEAVE; + + // ACE_SOCK_MCast_Dgram disallows sending, but it actually works. + ACE_SOCK_Dgram &dgram = this->dgram_; + + if (dgram.send (header, 1, to) == -1) + return -1; + + return 0; +} + +int +ACE_RMCast_IO_UDP::send_ack (ACE_RMCast::Ack &ack, + const ACE_INET_Addr &to) +{ + // @@ TODO: We could keep the header pre-initialized, and only + // update the portions that do change... + char header[16]; + header[0] = ACE_RMCast::MT_ACK; + + ACE_UINT32 tmp = ACE_HTONL (ack.highest_in_sequence); + ACE_OS::memcpy (header + 1, + &tmp, sizeof(ACE_UINT32)); + tmp = ACE_HTONL (ack.highest_received); + ACE_OS::memcpy (header + 1 + sizeof(ACE_UINT32), + &tmp, sizeof(ACE_UINT32)); + + // ACE_SOCK_MCast_Dgram disallows sending, but it actually works. + ACE_SOCK_Dgram &dgram = this->dgram_; + + if (dgram.send (header, 1 + 2*sizeof(ACE_UINT32), to) == -1) + return -1; + + return 0; +} + +int +ACE_RMCast_IO_UDP::send_join (ACE_RMCast::Join &join, + const ACE_INET_Addr &to) +{ + // @@ TODO: We could keep the header pre-initialized, and only + // update the portions that do change... + char header[16]; + header[0] = ACE_RMCast::MT_JOIN; + + // ACE_SOCK_MCast_Dgram disallows sending, but it actually works. + ACE_SOCK_Dgram &dgram = this->dgram_; + + if (dgram.send (header, 1, to) == -1) + return -1; + + return 0; +} + +int +ACE_RMCast_IO_UDP::send_leave (ACE_RMCast::Leave &, + const ACE_INET_Addr &to) +{ + // @@ TODO: We could keep the header pre-initialized, and only + // update the portions that do change... + char header[16]; + header[0] = ACE_RMCast::MT_LEAVE; + + // ACE_SOCK_MCast_Dgram disallows sending, but it actually works. + ACE_SOCK_Dgram &dgram = this->dgram_; + + if (dgram.send (header, 1, to) == -1) + return -1; + + return 0; +} + + +#if defined (ACE_HAS_EXPLICIT_TEMPLATE_INSTANTIATION) + +template class ACE_Hash_Map_Manager<ACE_INET_Addr,ACE_RMCast_UDP_Proxy*,ACE_Null_Mutex>; +template class ACE_Hash_Map_Manager_Ex<ACE_INET_Addr,ACE_RMCast_UDP_Proxy*,ACE_Hash<ACE_INET_Addr>,ACE_Equal_To<ACE_INET_Addr>,ACE_Null_Mutex>; +template class ACE_Hash_Map_Iterator<ACE_INET_Addr,ACE_RMCast_UDP_Proxy*,ACE_Null_Mutex>; +template class ACE_Hash_Map_Iterator_Base_Ex<ACE_INET_Addr,ACE_RMCast_UDP_Proxy*,ACE_Hash<ACE_INET_Addr>,ACE_Equal_To<ACE_INET_Addr>,ACE_Null_Mutex>; +template class ACE_Hash_Map_Entry<ACE_INET_Addr,ACE_RMCast_UDP_Proxy*>; + +#endif /* ACE_HAS_EXPLICIT_TEMPLATE_INSTANTIATION */ |