diff options
Diffstat (limited to 'protocols/ace/RMCast/Link.cpp')
-rw-r--r-- | protocols/ace/RMCast/Link.cpp | 271 |
1 files changed, 271 insertions, 0 deletions
diff --git a/protocols/ace/RMCast/Link.cpp b/protocols/ace/RMCast/Link.cpp new file mode 100644 index 00000000000..e6898cfd76c --- /dev/null +++ b/protocols/ace/RMCast/Link.cpp @@ -0,0 +1,271 @@ +// file : ace/RMCast/Link.cpp +// author : Boris Kolpackov <boris@kolpackov.net> +// cvs-id : $Id$ + +#include <ace/RMCast/Link.h> + +namespace ACE_RMCast +{ + Link:: + Link (Address const& addr) + : addr_ (addr), + ssock_ (Address ((unsigned short) (0), INADDR_ANY), + AF_INET, + IPPROTO_UDP, + 1) + + { + srand (time (0)); + + + rsock_.set_option (IP_MULTICAST_LOOP, 0); + + // Set recv/send buffers. + // + { + int r (131070); + int s (sizeof (r)); + + static_cast<ACE_SOCK&> (rsock_).set_option ( + SOL_SOCKET, SO_RCVBUF, &r, s); + + static_cast<ACE_SOCK&> (ssock_).set_option ( + SOL_SOCKET, SO_RCVBUF, &r, s); + + rsock_.get_option (SOL_SOCKET, SO_RCVBUF, &r, &s); + //cerr << 5 << "recv buffer size: " << r << endl; + + ssock_.get_option (SOL_SOCKET, SO_RCVBUF, &r, &s); + //cerr << 5 << "send buffer size: " << r << endl; + + } + + // Bind address and port. + // + if (ACE_OS::connect (ssock_.get_handle (), + reinterpret_cast<sockaddr*> (addr_.get_addr ()), + addr_.get_addr_size ()) == -1) + { + perror ("connect: "); + abort (); + } + + + ssock_.get_local_addr (self_); + + //cerr << 5 << "self: " << self_ << endl; + } + + void Link:: + in_start (In_Element* in) + { + Element::in_start (in); + + rsock_.join (addr_); + + // Start receiving thread. + // + recv_mgr_.spawn (recv_thunk, this); + } + + void Link:: + out_start (Out_Element* out) + { + Element::out_start (out); + } + + void Link:: + in_stop () + { + // Stop receiving thread. + // + recv_mgr_.cancel_all (1); + recv_mgr_.wait (); + + Element::in_stop (); + } + + void Link:: + send (Message_ptr m) + { + bool const sim = false; + + // Simulate message loss and reordering. + // + if (sim) + { + if ((rand () % 5) != 0) + { + Lock l (mutex_); + + if (hold_.get ()) + { + send_ (m); + send_ (hold_); + hold_ = 0; + } + else + { + hold_ = m; + + // Make a copy in M so that the reliable loop below + // won't add FROM and TO to HOLD_. + // + m = Message_ptr (new Message (*hold_)); + } + } + } + else + send_ (m); + + // Reliable loop. + // + m->add (Profile_ptr (new From (self_))); + m->add (Profile_ptr (new To (self_))); + + in_->recv (m); + } + + void Link:: + send_ (Message_ptr m) + { + ostream os (m->size (), 1); // Always little-endian. + + os << *m; + + ssock_.send (os.buffer (), os.length (), addr_); + + /* + if (m->find (nrtm::id)) + { + write (1, os.buffer (), os.length ()); + exit (1); + } + */ + } + + void Link:: + recv () + { + // I could have used ACE_Data_Block but it does not support + // resizing... + // + size_t size (0), capacity (8192); + char* data (reinterpret_cast<char*> (operator new (capacity))); + + auto_ptr<char> holder (data); // This is wicked. + + while (true) + { + //@@ Should I lock here? + // + + Address addr; + + //@@ CDR-specific. + // + size = rsock_.recv (data, 4, addr, MSG_PEEK); + + if (size != 4 || addr == self_) + { + // Discard bad messages and ones from ourselvs since + // we are using reliable loopback. + // + rsock_.recv (data, 0, addr); + continue; + } + + u32 msg_size; + { + istream is (data, size, 1); // Always little-endian. + is >> msg_size; + } + + if (msg_size <= 4) + { + // Bad message. + // + rsock_.recv (data, 0, addr); + continue; + } + + if (capacity < msg_size) + { + capacity = msg_size; + holder = auto_ptr<char> (0); + data = reinterpret_cast<char*> (operator new (capacity)); + holder = auto_ptr<char> (data); + } + + size = rsock_.recv (data, capacity, addr); + + if (msg_size != size) + { + // Bad message. + // + continue; + } + + //cerr << 6 << "from: " << addr << endl; + + Message_ptr m (new Message ()); + + m->add (Profile_ptr (new From (addr))); + m->add (Profile_ptr (new To (self_))); + + istream is (data, size, 1); // Always little-endian. + + is >> msg_size; + + while (true) + { + u16 id, size; + + if (!((is >> id) && (is >> size))) break; + + //cerr << 6 << "reading profile with id " << id << " " + // << size << " bytes long" << endl; + + Profile::Header hdr (id, size); + + switch (hdr.id ()) + { + case SN::id: + { + m->add (Profile_ptr (new SN (hdr, is))); + break; + } + case Data::id: + { + m->add (Profile_ptr (new Data (hdr, is))); + break; + } + case NAK::id: + { + m->add (Profile_ptr (new NAK (hdr, is))); + break; + } + case NRTM::id: + { + m->add (Profile_ptr (new NRTM (hdr, is))); + break; + } + default: + { + //cerr << 0 << "unknown profile id " << hdr.id () << endl; + abort (); + } + } + } + + in_->recv (m); + } + } + + ACE_THR_FUNC_RETURN Link:: + recv_thunk (void* obj) + { + reinterpret_cast<Link*> (obj)->recv (); + return 0; + } +} |