diff options
Diffstat (limited to 'ace/RMCast/RMCast_UDP_Receiver.cpp')
-rw-r--r-- | ace/RMCast/RMCast_UDP_Receiver.cpp | 241 |
1 files changed, 241 insertions, 0 deletions
diff --git a/ace/RMCast/RMCast_UDP_Receiver.cpp b/ace/RMCast/RMCast_UDP_Receiver.cpp new file mode 100644 index 00000000000..eeb03f50bcf --- /dev/null +++ b/ace/RMCast/RMCast_UDP_Receiver.cpp @@ -0,0 +1,241 @@ +// +// $Id$ +// + +#include "RMCast_UDP_Receiver.h" +#include "RMCast_Sender_Proxy.h" +#include "RMCast_Sender_Proxy_Factory.h" +#include "ace/Handle_Set.h" +#include "ace/Reactor.h" + +#if !defined (__ACE_INLINE__) +# include "RMCast_UDP_Receiver.i" +#endif /* ! __ACE_INLINE__ */ + +ACE_RCSID(ace, RMCast_Receiver, "$Id$") + +ACE_RMCast_UDP_Receiver::~ACE_RMCast_UDP_Receiver (void) +{ +} + +int +ACE_RMCast_UDP_Receiver::subscribe (const ACE_INET_Addr &mcast_addr, + int reuse_addr, + const ACE_TCHAR *net_if, + int protocol_family, + int protocol) +{ + return this->dgram_.subscribe (mcast_addr, + reuse_addr, + net_if, + protocol_family, + protocol); +} + +int +ACE_RMCast_UDP_Receiver::handle_events (ACE_Time_Value *tv) +{ + ACE_HANDLE h = this->dgram_.get_handle (); + if (h == ACE_INVALID_HANDLE) + return -1; + + ACE_Handle_Set handle_set; + handle_set.set_bit (h); + + ACE_Countdown_Time countdown (tv); + + int r = ACE_OS::select (int(h) + 1, + handle_set, 0, 0, + tv); + if (r == -1) + { + if (errno == EINTR) + return 0; + else + return -1; + } + else if (r == 0) + { + return 0; + } + + return this->handle_input (h); +} + +int +ACE_RMCast_UDP_Receiver::register_handlers (ACE_Reactor *reactor) +{ + this->eh_.reactor (reactor); + return reactor->register_handler (&this->eh_, + ACE_Event_Handler::READ_MASK); +} + +int +ACE_RMCast_UDP_Receiver::remove_handlers (void) +{ + ACE_Reactor *r = this->eh_.reactor (); + if (r != 0) + { + r->remove_handler (&this->eh_, + ACE_Event_Handler::ALL_EVENTS_MASK + | ACE_Event_Handler::DONT_CALL); + this->eh_.reactor (0); + } + return 0; +} + +int +ACE_RMCast_UDP_Receiver::handle_input (ACE_HANDLE) +{ + // @@ We should use a system constant instead of this literal + const int max_udp_packet_size = 65536; + char buffer[max_udp_packet_size]; + + ACE_INET_Addr from_address; + ssize_t r = + this->dgram_.recv (buffer, sizeof(buffer), from_address); + + if (r == -1) + { + // @@ LOG?? + ACE_DEBUG ((LM_DEBUG, + "RMCast_UDP_Receiver::handle_input () - " + "error in recv\n")); + return -1; + } + + // ACE_HEX_DUMP ((LM_DEBUG, buffer, 16, "Receiver::handle_input")); + + // @@ Locking! + + int type = buffer[0]; + + ACE_RMCast_Sender_Proxy *sender_proxy; + if (this->map_.find (from_address, sender_proxy) != 0) + { + // State == RS_NON_EXISTENT + + if (type == ACE_RMCast::MT_ACK + || type == ACE_RMCast::MT_JOIN + || type == ACE_RMCast::MT_LEAVE + || type == ACE_RMCast::MT_ACK_LEAVE) + { + // All these message types indicate a problem, the should be + // generated by receivers, not received by them. + return 0; + } + + // The message type is valid, we must create a new proxy, + // initially in the JOINING state... + sender_proxy = + this->factory_->create (); + if (sender_proxy == 0) + { + // @@ LOG?? + return 0; + } + if (this->map_.bind (from_address, sender_proxy) != 0) + { + // @@ LOG?? + return 0; + } + + // Send back a JOIN message... + return sender_proxy->receive_message (buffer, r); + } + + if (type == ACE_RMCast::MT_ACK + || type == ACE_RMCast::MT_JOIN + || type == ACE_RMCast::MT_LEAVE + || type == ACE_RMCast::MT_ACK_LEAVE + || type < 0 + || type >= ACE_RMCast::MT_LAST) + { + // In this case the message is invalid, but the proxy is already + // in the table, must destroy it because there was a violation + // in the protocol.... + + this->factory_->destroy (sender_proxy); + this->map_.unbind (from_address); + return 0; + } + + return sender_proxy->receive_message (buffer, r); +} + +ACE_HANDLE +ACE_RMCast_UDP_Receiver::get_handle (void) const +{ + return this->dgram_.get_handle (); +} + +#if 0 +int +ACE_RMCast_UDP_Receiver::send_join (ACE_INET_Addr &from) +{ + char buffer[16]; + buffer[0] = ACE_RMCast::MT_JOIN; + + ACE_SOCK_Dgram &dgram = this->dgram_; + ssize_t r = dgram.send (buffer, 1, from); + + if (r == -1) + return -1; + + return 0; +} + +int +ACE_RMCast_UDP_Receiver::send_ack (ACE_RMCast_Sender_Proxy *sender_proxy, + ACE_INET_Addr &from) +{ + char buffer[16]; + buffer[0] = ACE_RMCast::MT_ACK; + + ACE_UINT32 expected = sender_proxy->expected (); + expected = ACE_HTONL (expected); + + ACE_UINT32 last_received = sender_proxy->last_received (); + last_received = ACE_HTONL (last_received); + + ACE_OS::memcpy (buffer + 1, &expected, sizeof(expected)); + ACE_OS::memcpy (buffer + 1 + sizeof(expected), &last_received, + sizeof(last_received)); + + ACE_SOCK_Dgram &dgram = this->dgram_; + ssize_t r = dgram.send (buffer, + + sizeof(expected) + + sizeof(last_received), + from); + + if (r == -1) + return -1; + + return 0; +} + +int +ACE_RMCast_UDP_Receiver::send_leave (ACE_INET_Addr &from) +{ + char buffer[16]; + buffer[0] = ACE_RMCast::MT_LEAVE; + + ACE_SOCK_Dgram &dgram = this->dgram_; + ssize_t r = dgram.send (buffer, 1, from); + + if (r == -1) + return -1; + + return 0; +} +#endif /* 0 */ + +#if defined (ACE_HAS_EXPLICIT_TEMPLATE_INSTANTIATION) + +template class ACE_Hash_Map_Manager<ACE_INET_Addr,ACE_RMCast_Sender_Proxy*,ACE_Null_Mutex>; +template class ACE_Hash_Map_Manager_Ex<ACE_INET_Addr,ACE_RMCast_Sender_Proxy*,ACE_Hash<ACE_INET_Addr>,ACE_Equal_To<ACE_INET_Addr>,ACE_Null_Mutex>; +template class ACE_Hash_Map_Iterator<ACE_INET_Addr,ACE_RMCast_Sender_Proxy*,ACE_Null_Mutex>; +template class ACE_Hash_Map_Iterator_Base_Ex<ACE_INET_Addr,ACE_RMCast_Sender_Proxy*,ACE_Hash<ACE_INET_Addr>,ACE_Equal_To<ACE_INET_Addr>,ACE_Null_Mutex>; +template class ACE_Hash_Map_Entry<ACE_INET_Addr,ACE_RMCast_Sender_Proxy*>; + +#endif /* ACE_HAS_EXPLICIT_TEMPLATE_INSTANTIATION */ |