diff options
Diffstat (limited to 'protocols/ace/RMCast/Link.cpp')
-rw-r--r-- | protocols/ace/RMCast/Link.cpp | 330 |
1 files changed, 0 insertions, 330 deletions
diff --git a/protocols/ace/RMCast/Link.cpp b/protocols/ace/RMCast/Link.cpp deleted file mode 100644 index 3a9fdaea2b3..00000000000 --- a/protocols/ace/RMCast/Link.cpp +++ /dev/null @@ -1,330 +0,0 @@ -// file : ace/RMCast/Link.cpp -// author : Boris Kolpackov <boris@kolpackov.net> -// cvs-id : $Id$ - -#include "ace/Time_Value.h" // ACE_Time_Value -#include "ace/OS_NS_sys_socket.h" - -#include "Link.h" - -namespace ACE_RMCast -{ - Link:: - Link (Address const& addr, Parameters const& params) - : params_ (params), - addr_ (addr), - ssock_ (Address (static_cast<unsigned short> (0), - static_cast<ACE_UINT32> (INADDR_ANY)), - AF_INET, - IPPROTO_UDP, - 1), - stop_ (false) - - { - srand (time (0)); - - - rsock_.set_option (IP_MULTICAST_LOOP, 0); - // rsock_.set_option (IP_MULTICAST_TTL, 0); - - // Set recv/send buffers. - // - { - int r (131070); - int s (sizeof (r)); - - static_cast<ACE_SOCK&> (rsock_).set_option ( - SOL_SOCKET, SO_RCVBUF, &r, s); - - static_cast<ACE_SOCK&> (ssock_).set_option ( - SOL_SOCKET, SO_RCVBUF, &r, s); - - rsock_.get_option (SOL_SOCKET, SO_RCVBUF, &r, &s); - //cerr << 5 << "recv buffer size: " << r << endl; - - ssock_.get_option (SOL_SOCKET, SO_RCVBUF, &r, &s); - //cerr << 5 << "send buffer size: " << r << endl; - - } - - // Bind address and port. - // - if (ACE_OS::connect (ssock_.get_handle (), - reinterpret_cast<sockaddr*> (addr_.get_addr ()), - addr_.get_addr_size ()) == -1) - { - perror ("connect: "); - abort (); - } - - - ssock_.get_local_addr (self_); - - //cerr << 5 << "self: " << self_ << endl; - } - - void Link:: - in_start (In_Element* in) - { - Element::in_start (in); - - rsock_.join (addr_); - - // Start receiving thread. - // - recv_mgr_.spawn (recv_thunk, this); - } - - void Link:: - out_start (Out_Element* out) - { - Element::out_start (out); - } - - void Link:: - in_stop () - { - // Stop receiving thread. - // - { - Lock l (mutex_); - stop_ = true; - } - recv_mgr_.wait (); - - Element::in_stop (); - } - - void Link:: - send (Message_ptr m) - { - // Simulate message loss and reordering. - // - if (params_.simulator ()) - { - if ((rand () % 17) != 0) - { - Lock l (mutex_); - - if (hold_.get ()) - { - send_ (m); - send_ (hold_); - hold_ = Message_ptr (0); - } - else - { - if ((rand () % 17) != 0) - { - send_ (m); - } - else - { - hold_ = m; - - // Make a copy in M so that the reliable loop below - // won't add FROM and TO to HOLD_. - // - m = hold_->clone (); - } - } - } - } - else - send_ (m); - - // Reliable loop. - // - m->add (Profile_ptr (new From (self_))); - m->add (Profile_ptr (new To (self_))); - - in_->recv (m); - } - - void Link:: - send_ (Message_ptr m) - { - ostream os (m->size (), 1); // Always little-endian. - - os << *m; - - if (os.length () > size_t (params_.max_packet_size ())) - { - ACE_ERROR ((LM_ERROR, - "packet length (%d) exceeds max_poacket_size (%d)\n", - os.length (), params_.max_packet_size ())); - - for (Message::ProfileIterator i (m->begin ()); !i.done (); i.advance ()) - { - ACE_ERROR ((LM_ERROR, - "profile id: %d; size: %d\n", - (*i).ext_id_, (*i).int_id_->size ())); - } - - abort (); - } - - ssock_.send (os.buffer (), os.length (), addr_); - - /* - if (m->find (nrtm::id)) - { - write (1, os.buffer (), os.length ()); - exit (1); - } - */ - } - - void Link:: - recv () - { - size_t max_packet_size (params_.max_packet_size ()); - - // This is wicked. - // - ACE_Auto_Ptr<char> holder ( - reinterpret_cast<char*> ( - operator new (max_packet_size + ACE_CDR::MAX_ALIGNMENT))); - - char* data = ACE_ptr_align_binary (holder.get (), ACE_CDR::MAX_ALIGNMENT); - - size_t size (0); - - while (true) - { - //@@ Should I lock here? - // - - Address addr; - - // Block for up to one tick waiting for an incomming message. - // - for (;;) - { - ACE_Time_Value t (params_.tick ()); - ssize_t r = rsock_.recv (data, 4, addr, MSG_PEEK, &t); - - - // Check for cancellation request. - // - { - Lock l (mutex_); - if (stop_) - return; - } - - if (r == -1) - { - if (errno != ETIME) - abort (); - } - else - { - size = static_cast<size_t> (r); - break; - } - } - - - if (size != 4 || addr == self_) - { - // Discard bad messages and ones from ourselvs since - // we are using reliable loopback. - // - rsock_.recv (data, 0, addr); - continue; - } - - u32 msg_size; - { - istream is (data, size, 1); // Always little-endian. - is >> msg_size; - } - - if (msg_size <= 4 || msg_size > max_packet_size) - { - // Bad message. - // - rsock_.recv (data, 0, addr); - continue; - } - - size = rsock_.recv (data, max_packet_size, addr); - - if (msg_size != size) - { - // Bad message. - // - continue; - } - - //cerr << 6 << "from: " << addr << endl; - - Message_ptr m (new Message ()); - - m->add (Profile_ptr (new From (addr))); - m->add (Profile_ptr (new To (self_))); - - istream is (data, size, 1); // Always little-endian. - - is >> msg_size; - - while (true) - { - u16 id, size; - - if (!((is >> id) && (is >> size))) break; - - //cerr << 6 << "reading profile with id " << id << " " - // << size << " bytes long" << endl; - - Profile::Header hdr (id, size); - - if (id == SN::id) - { - m->add (Profile_ptr (new SN (hdr, is))); - } - else if (id == Data::id) - { - m->add (Profile_ptr (new Data (hdr, is))); - } - else if (id == NAK::id) - { - m->add (Profile_ptr (new NAK (hdr, is))); - } - else if (id == NRTM::id) - { - m->add (Profile_ptr (new NRTM (hdr, is))); - } - else if (id == NoData::id) - { - m->add (Profile_ptr (new NoData (hdr, is))); - } - else if (id == Part::id) - { - m->add (Profile_ptr (new Part (hdr, is))); - } - else - { - //cerr << 0 << "unknown profile id " << hdr.id () << endl; - abort (); - } - } - - in_->recv (m); - } - } - - ACE_THR_FUNC_RETURN Link:: - recv_thunk (void* obj) - { - reinterpret_cast<Link*> (obj)->recv (); - return 0; - } - - void Link:: - recv (Message_ptr) - { - abort (); - } -} |