// // $Id$ // #include "RMCast_UDP_Receiver.h" #include "RMCast_Sender_Proxy.h" #include "RMCast_Sender_Proxy_Factory.h" #include "ace/Handle_Set.h" #include "ace/Reactor.h" #if !defined (__ACE_INLINE__) # include "RMCast_UDP_Receiver.i" #endif /* ! __ACE_INLINE__ */ ACE_RCSID(ace, RMCast_Receiver, "$Id$") ACE_RMCast_UDP_Receiver::~ACE_RMCast_UDP_Receiver (void) { } int ACE_RMCast_UDP_Receiver::subscribe (const ACE_INET_Addr &mcast_addr, int reuse_addr, const ACE_TCHAR *net_if, int protocol_family, int protocol) { return this->dgram_.subscribe (mcast_addr, reuse_addr, net_if, protocol_family, protocol); } int ACE_RMCast_UDP_Receiver::handle_events (ACE_Time_Value *tv) { ACE_HANDLE h = this->dgram_.get_handle (); if (h == ACE_INVALID_HANDLE) return -1; ACE_Handle_Set handle_set; handle_set.set_bit (h); ACE_Countdown_Time countdown (tv); int r = ACE_OS::select (int(h) + 1, handle_set, 0, 0, tv); if (r == -1) { if (errno == EINTR) return 0; else return -1; } else if (r == 0) { return 0; } return this->handle_input (h); } int ACE_RMCast_UDP_Receiver::register_handlers (ACE_Reactor *reactor) { this->eh_.reactor (reactor); return reactor->register_handler (&this->eh_, ACE_Event_Handler::READ_MASK); } int ACE_RMCast_UDP_Receiver::remove_handlers (void) { ACE_Reactor *r = this->eh_.reactor (); if (r != 0) { r->remove_handler (&this->eh_, ACE_Event_Handler::ALL_EVENTS_MASK | ACE_Event_Handler::DONT_CALL); this->eh_.reactor (0); } return 0; } int ACE_RMCast_UDP_Receiver::handle_input (ACE_HANDLE) { // @@ We should use a system constant instead of this literal const int max_udp_packet_size = 65536; char buffer[max_udp_packet_size]; ACE_INET_Addr from_address; ssize_t r = this->dgram_.recv (buffer, sizeof(buffer), from_address); if (r == -1) { // @@ LOG?? ACE_DEBUG ((LM_DEBUG, "RMCast_UDP_Receiver::handle_input () - " "error in recv\n")); return -1; } // ACE_HEX_DUMP ((LM_DEBUG, buffer, 16, "Receiver::handle_input")); // @@ Locking! int type = buffer[0]; ACE_RMCast_Sender_Proxy *sender_proxy; if (this->map_.find (from_address, sender_proxy) != 0) { // State == RS_NON_EXISTENT if (type == ACE_RMCast::MT_ACK || type == ACE_RMCast::MT_JOIN || type == ACE_RMCast::MT_LEAVE || type == ACE_RMCast::MT_ACK_LEAVE) { // All these message types indicate a problem, the should be // generated by receivers, not received by them. return 0; } // The message type is valid, we must create a new proxy, // initially in the JOINING state... sender_proxy = this->factory_->create (); if (sender_proxy == 0) { // @@ LOG?? return 0; } if (this->map_.bind (from_address, sender_proxy) != 0) { // @@ LOG?? return 0; } // Send back a JOIN message... return sender_proxy->receive_message (buffer, r); } if (type == ACE_RMCast::MT_ACK || type == ACE_RMCast::MT_JOIN || type == ACE_RMCast::MT_LEAVE || type == ACE_RMCast::MT_ACK_LEAVE || type < 0 || type >= ACE_RMCast::MT_LAST) { // In this case the message is invalid, but the proxy is already // in the table, must destroy it because there was a violation // in the protocol.... this->factory_->destroy (sender_proxy); this->map_.unbind (from_address); return 0; } return sender_proxy->receive_message (buffer, r); } ACE_HANDLE ACE_RMCast_UDP_Receiver::get_handle (void) const { return this->dgram_.get_handle (); } #if 0 int ACE_RMCast_UDP_Receiver::send_join (ACE_INET_Addr &from) { char buffer[16]; buffer[0] = ACE_RMCast::MT_JOIN; ACE_SOCK_Dgram &dgram = this->dgram_; ssize_t r = dgram.send (buffer, 1, from); if (r == -1) return -1; return 0; } int ACE_RMCast_UDP_Receiver::send_ack (ACE_RMCast_Sender_Proxy *sender_proxy, ACE_INET_Addr &from) { char buffer[16]; buffer[0] = ACE_RMCast::MT_ACK; ACE_UINT32 expected = sender_proxy->expected (); expected = ACE_HTONL (expected); ACE_UINT32 last_received = sender_proxy->last_received (); last_received = ACE_HTONL (last_received); ACE_OS::memcpy (buffer + 1, &expected, sizeof(expected)); ACE_OS::memcpy (buffer + 1 + sizeof(expected), &last_received, sizeof(last_received)); ACE_SOCK_Dgram &dgram = this->dgram_; ssize_t r = dgram.send (buffer, + sizeof(expected) + sizeof(last_received), from); if (r == -1) return -1; return 0; } int ACE_RMCast_UDP_Receiver::send_leave (ACE_INET_Addr &from) { char buffer[16]; buffer[0] = ACE_RMCast::MT_LEAVE; ACE_SOCK_Dgram &dgram = this->dgram_; ssize_t r = dgram.send (buffer, 1, from); if (r == -1) return -1; return 0; } #endif /* 0 */ #if defined (ACE_HAS_EXPLICIT_TEMPLATE_INSTANTIATION) template class ACE_Hash_Map_Manager; template class ACE_Hash_Map_Manager_Ex,ACE_Equal_To,ACE_Null_Mutex>; template class ACE_Hash_Map_Iterator; template class ACE_Hash_Map_Iterator_Base_Ex,ACE_Equal_To,ACE_Null_Mutex>; template class ACE_Hash_Map_Entry; #endif /* ACE_HAS_EXPLICIT_TEMPLATE_INSTANTIATION */