summaryrefslogtreecommitdiff
path: root/ace/RMCast/RMCast_UDP_Receiver.cpp
diff options
context:
space:
mode:
Diffstat (limited to 'ace/RMCast/RMCast_UDP_Receiver.cpp')
-rw-r--r--ace/RMCast/RMCast_UDP_Receiver.cpp241
1 files changed, 241 insertions, 0 deletions
diff --git a/ace/RMCast/RMCast_UDP_Receiver.cpp b/ace/RMCast/RMCast_UDP_Receiver.cpp
new file mode 100644
index 00000000000..eeb03f50bcf
--- /dev/null
+++ b/ace/RMCast/RMCast_UDP_Receiver.cpp
@@ -0,0 +1,241 @@
+//
+// $Id$
+//
+
+#include "RMCast_UDP_Receiver.h"
+#include "RMCast_Sender_Proxy.h"
+#include "RMCast_Sender_Proxy_Factory.h"
+#include "ace/Handle_Set.h"
+#include "ace/Reactor.h"
+
+#if !defined (__ACE_INLINE__)
+# include "RMCast_UDP_Receiver.i"
+#endif /* ! __ACE_INLINE__ */
+
+ACE_RCSID(ace, RMCast_Receiver, "$Id$")
+
+ACE_RMCast_UDP_Receiver::~ACE_RMCast_UDP_Receiver (void)
+{
+}
+
+int
+ACE_RMCast_UDP_Receiver::subscribe (const ACE_INET_Addr &mcast_addr,
+ int reuse_addr,
+ const ACE_TCHAR *net_if,
+ int protocol_family,
+ int protocol)
+{
+ return this->dgram_.subscribe (mcast_addr,
+ reuse_addr,
+ net_if,
+ protocol_family,
+ protocol);
+}
+
+int
+ACE_RMCast_UDP_Receiver::handle_events (ACE_Time_Value *tv)
+{
+ ACE_HANDLE h = this->dgram_.get_handle ();
+ if (h == ACE_INVALID_HANDLE)
+ return -1;
+
+ ACE_Handle_Set handle_set;
+ handle_set.set_bit (h);
+
+ ACE_Countdown_Time countdown (tv);
+
+ int r = ACE_OS::select (int(h) + 1,
+ handle_set, 0, 0,
+ tv);
+ if (r == -1)
+ {
+ if (errno == EINTR)
+ return 0;
+ else
+ return -1;
+ }
+ else if (r == 0)
+ {
+ return 0;
+ }
+
+ return this->handle_input (h);
+}
+
+int
+ACE_RMCast_UDP_Receiver::register_handlers (ACE_Reactor *reactor)
+{
+ this->eh_.reactor (reactor);
+ return reactor->register_handler (&this->eh_,
+ ACE_Event_Handler::READ_MASK);
+}
+
+int
+ACE_RMCast_UDP_Receiver::remove_handlers (void)
+{
+ ACE_Reactor *r = this->eh_.reactor ();
+ if (r != 0)
+ {
+ r->remove_handler (&this->eh_,
+ ACE_Event_Handler::ALL_EVENTS_MASK
+ | ACE_Event_Handler::DONT_CALL);
+ this->eh_.reactor (0);
+ }
+ return 0;
+}
+
+int
+ACE_RMCast_UDP_Receiver::handle_input (ACE_HANDLE)
+{
+ // @@ We should use a system constant instead of this literal
+ const int max_udp_packet_size = 65536;
+ char buffer[max_udp_packet_size];
+
+ ACE_INET_Addr from_address;
+ ssize_t r =
+ this->dgram_.recv (buffer, sizeof(buffer), from_address);
+
+ if (r == -1)
+ {
+ // @@ LOG??
+ ACE_DEBUG ((LM_DEBUG,
+ "RMCast_UDP_Receiver::handle_input () - "
+ "error in recv\n"));
+ return -1;
+ }
+
+ // ACE_HEX_DUMP ((LM_DEBUG, buffer, 16, "Receiver::handle_input"));
+
+ // @@ Locking!
+
+ int type = buffer[0];
+
+ ACE_RMCast_Sender_Proxy *sender_proxy;
+ if (this->map_.find (from_address, sender_proxy) != 0)
+ {
+ // State == RS_NON_EXISTENT
+
+ if (type == ACE_RMCast::MT_ACK
+ || type == ACE_RMCast::MT_JOIN
+ || type == ACE_RMCast::MT_LEAVE
+ || type == ACE_RMCast::MT_ACK_LEAVE)
+ {
+ // All these message types indicate a problem, the should be
+ // generated by receivers, not received by them.
+ return 0;
+ }
+
+ // The message type is valid, we must create a new proxy,
+ // initially in the JOINING state...
+ sender_proxy =
+ this->factory_->create ();
+ if (sender_proxy == 0)
+ {
+ // @@ LOG??
+ return 0;
+ }
+ if (this->map_.bind (from_address, sender_proxy) != 0)
+ {
+ // @@ LOG??
+ return 0;
+ }
+
+ // Send back a JOIN message...
+ return sender_proxy->receive_message (buffer, r);
+ }
+
+ if (type == ACE_RMCast::MT_ACK
+ || type == ACE_RMCast::MT_JOIN
+ || type == ACE_RMCast::MT_LEAVE
+ || type == ACE_RMCast::MT_ACK_LEAVE
+ || type < 0
+ || type >= ACE_RMCast::MT_LAST)
+ {
+ // In this case the message is invalid, but the proxy is already
+ // in the table, must destroy it because there was a violation
+ // in the protocol....
+
+ this->factory_->destroy (sender_proxy);
+ this->map_.unbind (from_address);
+ return 0;
+ }
+
+ return sender_proxy->receive_message (buffer, r);
+}
+
+ACE_HANDLE
+ACE_RMCast_UDP_Receiver::get_handle (void) const
+{
+ return this->dgram_.get_handle ();
+}
+
+#if 0
+int
+ACE_RMCast_UDP_Receiver::send_join (ACE_INET_Addr &from)
+{
+ char buffer[16];
+ buffer[0] = ACE_RMCast::MT_JOIN;
+
+ ACE_SOCK_Dgram &dgram = this->dgram_;
+ ssize_t r = dgram.send (buffer, 1, from);
+
+ if (r == -1)
+ return -1;
+
+ return 0;
+}
+
+int
+ACE_RMCast_UDP_Receiver::send_ack (ACE_RMCast_Sender_Proxy *sender_proxy,
+ ACE_INET_Addr &from)
+{
+ char buffer[16];
+ buffer[0] = ACE_RMCast::MT_ACK;
+
+ ACE_UINT32 expected = sender_proxy->expected ();
+ expected = ACE_HTONL (expected);
+
+ ACE_UINT32 last_received = sender_proxy->last_received ();
+ last_received = ACE_HTONL (last_received);
+
+ ACE_OS::memcpy (buffer + 1, &expected, sizeof(expected));
+ ACE_OS::memcpy (buffer + 1 + sizeof(expected), &last_received,
+ sizeof(last_received));
+
+ ACE_SOCK_Dgram &dgram = this->dgram_;
+ ssize_t r = dgram.send (buffer,
+ + sizeof(expected)
+ + sizeof(last_received),
+ from);
+
+ if (r == -1)
+ return -1;
+
+ return 0;
+}
+
+int
+ACE_RMCast_UDP_Receiver::send_leave (ACE_INET_Addr &from)
+{
+ char buffer[16];
+ buffer[0] = ACE_RMCast::MT_LEAVE;
+
+ ACE_SOCK_Dgram &dgram = this->dgram_;
+ ssize_t r = dgram.send (buffer, 1, from);
+
+ if (r == -1)
+ return -1;
+
+ return 0;
+}
+#endif /* 0 */
+
+#if defined (ACE_HAS_EXPLICIT_TEMPLATE_INSTANTIATION)
+
+template class ACE_Hash_Map_Manager<ACE_INET_Addr,ACE_RMCast_Sender_Proxy*,ACE_Null_Mutex>;
+template class ACE_Hash_Map_Manager_Ex<ACE_INET_Addr,ACE_RMCast_Sender_Proxy*,ACE_Hash<ACE_INET_Addr>,ACE_Equal_To<ACE_INET_Addr>,ACE_Null_Mutex>;
+template class ACE_Hash_Map_Iterator<ACE_INET_Addr,ACE_RMCast_Sender_Proxy*,ACE_Null_Mutex>;
+template class ACE_Hash_Map_Iterator_Base_Ex<ACE_INET_Addr,ACE_RMCast_Sender_Proxy*,ACE_Hash<ACE_INET_Addr>,ACE_Equal_To<ACE_INET_Addr>,ACE_Null_Mutex>;
+template class ACE_Hash_Map_Entry<ACE_INET_Addr,ACE_RMCast_Sender_Proxy*>;
+
+#endif /* ACE_HAS_EXPLICIT_TEMPLATE_INSTANTIATION */