diff options
Diffstat (limited to 'ace/RMCast/RMCast_IO_UDP.cpp')
-rw-r--r-- | ace/RMCast/RMCast_IO_UDP.cpp | 418 |
1 files changed, 0 insertions, 418 deletions
diff --git a/ace/RMCast/RMCast_IO_UDP.cpp b/ace/RMCast/RMCast_IO_UDP.cpp deleted file mode 100644 index 476e73d60ea..00000000000 --- a/ace/RMCast/RMCast_IO_UDP.cpp +++ /dev/null @@ -1,418 +0,0 @@ -// -// $Id$ -// - -#include "RMCast_IO_UDP.h" -#include "RMCast_UDP_Proxy.h" -#include "RMCast_Module_Factory.h" -#include "ace/Handle_Set.h" -#include "ace/Reactor.h" -#include "ace/Message_Block.h" - -#if !defined (__ACE_INLINE__) -# include "RMCast_IO_UDP.i" -#endif /* ! __ACE_INLINE__ */ - -ACE_RCSID(ace, RMCast_IO_UDP, "$Id$") - -ACE_RMCast_IO_UDP::~ACE_RMCast_IO_UDP (void) -{ -} - -int -ACE_RMCast_IO_UDP::subscribe (const ACE_INET_Addr &mcast_addr, - int reuse_addr, - const ACE_TCHAR *net_if, - int protocol_family, - int protocol) -{ - this->mcast_group_ = mcast_addr; - return this->dgram_.subscribe (mcast_addr, - reuse_addr, - net_if, - protocol_family, - protocol); -} - -int -ACE_RMCast_IO_UDP::handle_events (ACE_Time_Value *tv) -{ - ACE_HANDLE h = this->dgram_.get_handle (); - if (h == ACE_INVALID_HANDLE) - return -1; - - ACE_Handle_Set handle_set; - handle_set.set_bit (h); - - ACE_Countdown_Time countdown (tv); - - int r = ACE_OS::select (int(h) + 1, - handle_set, 0, 0, - tv); - if (r == -1) - { - if (errno == EINTR) - return 0; - else - return -1; - } - else if (r == 0) - { - return 0; - } - - return this->handle_input (h); -} - -int -ACE_RMCast_IO_UDP::register_handlers (ACE_Reactor *reactor) -{ - this->eh_.reactor (reactor); - return reactor->register_handler (&this->eh_, - ACE_Event_Handler::READ_MASK); -} - -int -ACE_RMCast_IO_UDP::remove_handlers (void) -{ - ACE_Reactor *r = this->eh_.reactor (); - if (r != 0) - { - r->remove_handler (&this->eh_, - ACE_Event_Handler::ALL_EVENTS_MASK - | ACE_Event_Handler::DONT_CALL); - this->eh_.reactor (0); - } - return 0; -} - -int -ACE_RMCast_IO_UDP::handle_input (ACE_HANDLE) -{ - // @@ We should use a system constant instead of this literal - const int max_udp_packet_size = 65536; - char buffer[max_udp_packet_size]; - - ACE_INET_Addr from_address; - ssize_t r = - this->dgram_.recv (buffer, sizeof(buffer), from_address); - - if (r == -1) - { - // @@ LOG?? - ACE_DEBUG ((LM_DEBUG, - "RMCast_IO_UDP::handle_input () - " - "error in recv\n")); - return -1; - } - - // ACE_HEX_DUMP ((LM_DEBUG, buffer, 16, "Receiver::handle_input")); - - // @@ Locking! - - int type = buffer[0]; - - if (type < 0 || type >= ACE_RMCast::MT_LAST) - { - // @@ Log: invalid message type!! - // @@ TODO: should we return -1? The socket is still valid, it - // makes little sense to destroy it just because one remote - // sender is sending invalid messages. Maybe we should - // strategize this too, and report the problem to the - // application, this could indicate a misconfiguration or - // something worse... - - // In any case the proxy should be destroyed, its peer is making - // something really wrong. - ACE_RMCast_UDP_Proxy *proxy; - if (this->map_.unbind (from_address, proxy) == 0) - { - this->factory_->destroy (proxy->module ()); - delete proxy; - } - return 0; - } - - ACE_RMCast_UDP_Proxy *proxy; - if (this->map_.find (from_address, proxy) != 0) - { - // State == RS_NON_EXISTENT - - // @@ We should validate the message *before* creating the - // object, all we need is some sort of validation strategy, a - // different one for the receiver and another one for the - // sender. -#if 0 - if (type == ACE_RMCast::MT_ACK - || type == ACE_RMCast::MT_JOIN - || type == ACE_RMCast::MT_LEAVE - || type == ACE_RMCast::MT_ACK_LEAVE) - { - // All these message types indicate a problem, the should be - // generated by receivers, not received by them. - return 0; - } -#endif /* 0 */ - - // The message type is valid, we must create a new proxy, - // initially in the JOINING state... - ACE_RMCast_Module *module = this->factory_->create (this); - if (module == 0) - { - // @@ LOG?? - // Try to continue working, maybe the module can be created - // later. - return 0; - } - ACE_NEW_RETURN (proxy, - ACE_RMCast_UDP_Proxy(this, - from_address, - module), - 0); - - if (this->map_.bind (from_address, proxy) != 0) - { - // @@ LOG?? - return 0; - } - - } - - // Have the proxy process the message and do the right thing. - return proxy->receive_message (buffer, r); -} - -ACE_HANDLE -ACE_RMCast_IO_UDP::get_handle (void) const -{ - return this->dgram_.get_handle (); -} - -int -ACE_RMCast_IO_UDP::data (ACE_RMCast::Data &data) -{ - return this->send_data (data, this->mcast_group_); -} - -int -ACE_RMCast_IO_UDP::poll (ACE_RMCast::Poll &poll) -{ - return this->send_poll (poll, this->mcast_group_); -} - -int -ACE_RMCast_IO_UDP::ack_join (ACE_RMCast::Ack_Join &ack_join) -{ - return this->send_ack_join (ack_join, this->mcast_group_); -} - -int -ACE_RMCast_IO_UDP::ack_leave (ACE_RMCast::Ack_Leave &ack_leave) -{ - return this->send_ack_leave (ack_leave, this->mcast_group_); -} - -int -ACE_RMCast_IO_UDP::ack (ACE_RMCast::Ack &ack) -{ - return this->send_ack (ack, this->mcast_group_); -} - -int -ACE_RMCast_IO_UDP::join (ACE_RMCast::Join &join) -{ - return this->send_join (join, this->mcast_group_); -} - -int -ACE_RMCast_IO_UDP::leave (ACE_RMCast::Leave &leave) -{ - return this->send_leave (leave, this->mcast_group_); -} - -int -ACE_RMCast_IO_UDP::send_data (ACE_RMCast::Data &data, - const ACE_INET_Addr &to) -{ - // The first message block contains the header - // @@ TODO: We could keep the header pre-initialized, and only - // update the portions that do change... - ACE_UINT32 tmp; - char header[1 + 3 * sizeof(ACE_UINT32)]; - header[0] = ACE_RMCast::MT_DATA; - - tmp = ACE_HTONL (data.sequence_number); - ACE_OS::memcpy (header + 1, - &tmp, sizeof(ACE_UINT32)); - tmp = ACE_HTONL (data.total_size); - ACE_OS::memcpy (header + 1 + sizeof(ACE_UINT32), - &tmp, sizeof(ACE_UINT32)); - tmp = ACE_HTONL (data.fragment_offset); - ACE_OS::memcpy (header + 1 + 2 * sizeof(ACE_UINT32), - &tmp, sizeof(ACE_UINT32)); - - iovec iov[IOV_MAX]; - int iovcnt = 1; - - iov[0].iov_base = header; - iov[0].iov_len = sizeof(header); - - ACE_Message_Block *mb = data.payload; - - for (const ACE_Message_Block *i = mb; i != 0; i = i->cont ()) - { - iov[iovcnt].iov_base = i->rd_ptr (); - iov[iovcnt].iov_len = i->length (); - iovcnt++; - if (iovcnt >= IOV_MAX) - return -1; - } - - // @@ This pacing stuff here reduced the number of packet lost in - // loopback tests, but it should be taken out for real applications - // (or at least made configurable!) - ACE_Time_Value tv (0, 10000); - ACE_OS::sleep (tv); - - // ACE_SOCK_MCast_Dgram disallows sending, but it actually works. - ACE_SOCK_Dgram &dgram = this->dgram_; - - if (dgram.send (iov, iovcnt, to) == -1) - return -1; - -#if 0 - ACE_HEX_DUMP ((LM_DEBUG, - (char*)iov[0].iov_base, iov[0].iov_len, "Sending")); -#endif - - return 0; -} - -int -ACE_RMCast_IO_UDP::send_poll (ACE_RMCast::Poll &, - const ACE_INET_Addr &to) -{ - // @@ TODO: We could keep the header pre-initialized, and only - // update the portions that do change... - char header[16]; - header[0] = ACE_RMCast::MT_POLL; - - // ACE_SOCK_MCast_Dgram disallows sending, but it actually works. - ACE_SOCK_Dgram &dgram = this->dgram_; - - if (dgram.send (header, 1, to) == -1) - return -1; - - return 0; -} - -int -ACE_RMCast_IO_UDP::send_ack_join (ACE_RMCast::Ack_Join &ack_join, - const ACE_INET_Addr &to) -{ - // @@ TODO: We could keep the header pre-initialized, and only - // update the portions that do change... - char header[16]; - header[0] = ACE_RMCast::MT_ACK_JOIN; - - ACE_UINT32 tmp = ACE_HTONL (ack_join.next_sequence_number); - ACE_OS::memcpy (header + 1, - &tmp, sizeof(ACE_UINT32)); - // ACE_SOCK_MCast_Dgram disallows sending, but it actually works. - ACE_SOCK_Dgram &dgram = this->dgram_; - - if (dgram.send (header, 1 + sizeof(ACE_UINT32), to) == -1) - return -1; - - return 0; -} - -int -ACE_RMCast_IO_UDP::send_ack_leave (ACE_RMCast::Ack_Leave &, - const ACE_INET_Addr &to) -{ - // @@ TODO: We could keep the header pre-initialized, and only - // update the portions that do change... - char header[16]; - header[0] = ACE_RMCast::MT_ACK_LEAVE; - - // ACE_SOCK_MCast_Dgram disallows sending, but it actually works. - ACE_SOCK_Dgram &dgram = this->dgram_; - - if (dgram.send (header, 1, to) == -1) - return -1; - - return 0; -} - -int -ACE_RMCast_IO_UDP::send_ack (ACE_RMCast::Ack &ack, - const ACE_INET_Addr &to) -{ - // @@ TODO: We could keep the header pre-initialized, and only - // update the portions that do change... - char header[16]; - header[0] = ACE_RMCast::MT_ACK; - - ACE_UINT32 tmp = ACE_HTONL (ack.highest_in_sequence); - ACE_OS::memcpy (header + 1, - &tmp, sizeof(ACE_UINT32)); - tmp = ACE_HTONL (ack.highest_received); - ACE_OS::memcpy (header + 1 + sizeof(ACE_UINT32), - &tmp, sizeof(ACE_UINT32)); - - // ACE_SOCK_MCast_Dgram disallows sending, but it actually works. - ACE_SOCK_Dgram &dgram = this->dgram_; - - if (dgram.send (header, 1 + 2*sizeof(ACE_UINT32), to) == -1) - return -1; - - return 0; -} - -int -ACE_RMCast_IO_UDP::send_join (ACE_RMCast::Join &, - const ACE_INET_Addr &to) -{ - // @@ TODO: We could keep the header pre-initialized, and only - // update the portions that do change... - char header[16]; - header[0] = ACE_RMCast::MT_JOIN; - - // ACE_SOCK_MCast_Dgram disallows sending, but it actually works. - ACE_SOCK_Dgram &dgram = this->dgram_; - - if (dgram.send (header, 1, to) == -1) - return -1; - - return 0; -} - -int -ACE_RMCast_IO_UDP::send_leave (ACE_RMCast::Leave &, - const ACE_INET_Addr &to) -{ - // @@ TODO: We could keep the header pre-initialized, and only - // update the portions that do change... - char header[16]; - header[0] = ACE_RMCast::MT_LEAVE; - - // ACE_SOCK_MCast_Dgram disallows sending, but it actually works. - ACE_SOCK_Dgram &dgram = this->dgram_; - - if (dgram.send (header, 1, to) == -1) - return -1; - - return 0; -} - - -#if defined (ACE_HAS_EXPLICIT_TEMPLATE_INSTANTIATION) - -template class ACE_Hash_Map_Manager<ACE_INET_Addr,ACE_RMCast_UDP_Proxy*,ACE_Null_Mutex>; -template class ACE_Hash_Map_Manager_Ex<ACE_INET_Addr,ACE_RMCast_UDP_Proxy*,ACE_Hash<ACE_INET_Addr>,ACE_Equal_To<ACE_INET_Addr>,ACE_Null_Mutex>; -template class ACE_Hash_Map_Iterator<ACE_INET_Addr,ACE_RMCast_UDP_Proxy*,ACE_Null_Mutex>; -template class ACE_Hash_Map_Iterator_Base_Ex<ACE_INET_Addr,ACE_RMCast_UDP_Proxy*,ACE_Hash<ACE_INET_Addr>,ACE_Equal_To<ACE_INET_Addr>,ACE_Null_Mutex>; -template class ACE_Hash_Map_Entry<ACE_INET_Addr,ACE_RMCast_UDP_Proxy*>; - -#endif /* ACE_HAS_EXPLICIT_TEMPLATE_INSTANTIATION */ |