summaryrefslogtreecommitdiff
path: root/protocols/ace/RMCast/Link.cpp
diff options
context:
space:
mode:
Diffstat (limited to 'protocols/ace/RMCast/Link.cpp')
-rw-r--r--protocols/ace/RMCast/Link.cpp330
1 files changed, 0 insertions, 330 deletions
diff --git a/protocols/ace/RMCast/Link.cpp b/protocols/ace/RMCast/Link.cpp
deleted file mode 100644
index 3a9fdaea2b3..00000000000
--- a/protocols/ace/RMCast/Link.cpp
+++ /dev/null
@@ -1,330 +0,0 @@
-// file : ace/RMCast/Link.cpp
-// author : Boris Kolpackov <boris@kolpackov.net>
-// cvs-id : $Id$
-
-#include "ace/Time_Value.h" // ACE_Time_Value
-#include "ace/OS_NS_sys_socket.h"
-
-#include "Link.h"
-
-namespace ACE_RMCast
-{
- Link::
- Link (Address const& addr, Parameters const& params)
- : params_ (params),
- addr_ (addr),
- ssock_ (Address (static_cast<unsigned short> (0),
- static_cast<ACE_UINT32> (INADDR_ANY)),
- AF_INET,
- IPPROTO_UDP,
- 1),
- stop_ (false)
-
- {
- srand (time (0));
-
-
- rsock_.set_option (IP_MULTICAST_LOOP, 0);
- // rsock_.set_option (IP_MULTICAST_TTL, 0);
-
- // Set recv/send buffers.
- //
- {
- int r (131070);
- int s (sizeof (r));
-
- static_cast<ACE_SOCK&> (rsock_).set_option (
- SOL_SOCKET, SO_RCVBUF, &r, s);
-
- static_cast<ACE_SOCK&> (ssock_).set_option (
- SOL_SOCKET, SO_RCVBUF, &r, s);
-
- rsock_.get_option (SOL_SOCKET, SO_RCVBUF, &r, &s);
- //cerr << 5 << "recv buffer size: " << r << endl;
-
- ssock_.get_option (SOL_SOCKET, SO_RCVBUF, &r, &s);
- //cerr << 5 << "send buffer size: " << r << endl;
-
- }
-
- // Bind address and port.
- //
- if (ACE_OS::connect (ssock_.get_handle (),
- reinterpret_cast<sockaddr*> (addr_.get_addr ()),
- addr_.get_addr_size ()) == -1)
- {
- perror ("connect: ");
- abort ();
- }
-
-
- ssock_.get_local_addr (self_);
-
- //cerr << 5 << "self: " << self_ << endl;
- }
-
- void Link::
- in_start (In_Element* in)
- {
- Element::in_start (in);
-
- rsock_.join (addr_);
-
- // Start receiving thread.
- //
- recv_mgr_.spawn (recv_thunk, this);
- }
-
- void Link::
- out_start (Out_Element* out)
- {
- Element::out_start (out);
- }
-
- void Link::
- in_stop ()
- {
- // Stop receiving thread.
- //
- {
- Lock l (mutex_);
- stop_ = true;
- }
- recv_mgr_.wait ();
-
- Element::in_stop ();
- }
-
- void Link::
- send (Message_ptr m)
- {
- // Simulate message loss and reordering.
- //
- if (params_.simulator ())
- {
- if ((rand () % 17) != 0)
- {
- Lock l (mutex_);
-
- if (hold_.get ())
- {
- send_ (m);
- send_ (hold_);
- hold_ = Message_ptr (0);
- }
- else
- {
- if ((rand () % 17) != 0)
- {
- send_ (m);
- }
- else
- {
- hold_ = m;
-
- // Make a copy in M so that the reliable loop below
- // won't add FROM and TO to HOLD_.
- //
- m = hold_->clone ();
- }
- }
- }
- }
- else
- send_ (m);
-
- // Reliable loop.
- //
- m->add (Profile_ptr (new From (self_)));
- m->add (Profile_ptr (new To (self_)));
-
- in_->recv (m);
- }
-
- void Link::
- send_ (Message_ptr m)
- {
- ostream os (m->size (), 1); // Always little-endian.
-
- os << *m;
-
- if (os.length () > size_t (params_.max_packet_size ()))
- {
- ACE_ERROR ((LM_ERROR,
- "packet length (%d) exceeds max_poacket_size (%d)\n",
- os.length (), params_.max_packet_size ()));
-
- for (Message::ProfileIterator i (m->begin ()); !i.done (); i.advance ())
- {
- ACE_ERROR ((LM_ERROR,
- "profile id: %d; size: %d\n",
- (*i).ext_id_, (*i).int_id_->size ()));
- }
-
- abort ();
- }
-
- ssock_.send (os.buffer (), os.length (), addr_);
-
- /*
- if (m->find (nrtm::id))
- {
- write (1, os.buffer (), os.length ());
- exit (1);
- }
- */
- }
-
- void Link::
- recv ()
- {
- size_t max_packet_size (params_.max_packet_size ());
-
- // This is wicked.
- //
- ACE_Auto_Ptr<char> holder (
- reinterpret_cast<char*> (
- operator new (max_packet_size + ACE_CDR::MAX_ALIGNMENT)));
-
- char* data = ACE_ptr_align_binary (holder.get (), ACE_CDR::MAX_ALIGNMENT);
-
- size_t size (0);
-
- while (true)
- {
- //@@ Should I lock here?
- //
-
- Address addr;
-
- // Block for up to one tick waiting for an incomming message.
- //
- for (;;)
- {
- ACE_Time_Value t (params_.tick ());
- ssize_t r = rsock_.recv (data, 4, addr, MSG_PEEK, &t);
-
-
- // Check for cancellation request.
- //
- {
- Lock l (mutex_);
- if (stop_)
- return;
- }
-
- if (r == -1)
- {
- if (errno != ETIME)
- abort ();
- }
- else
- {
- size = static_cast<size_t> (r);
- break;
- }
- }
-
-
- if (size != 4 || addr == self_)
- {
- // Discard bad messages and ones from ourselvs since
- // we are using reliable loopback.
- //
- rsock_.recv (data, 0, addr);
- continue;
- }
-
- u32 msg_size;
- {
- istream is (data, size, 1); // Always little-endian.
- is >> msg_size;
- }
-
- if (msg_size <= 4 || msg_size > max_packet_size)
- {
- // Bad message.
- //
- rsock_.recv (data, 0, addr);
- continue;
- }
-
- size = rsock_.recv (data, max_packet_size, addr);
-
- if (msg_size != size)
- {
- // Bad message.
- //
- continue;
- }
-
- //cerr << 6 << "from: " << addr << endl;
-
- Message_ptr m (new Message ());
-
- m->add (Profile_ptr (new From (addr)));
- m->add (Profile_ptr (new To (self_)));
-
- istream is (data, size, 1); // Always little-endian.
-
- is >> msg_size;
-
- while (true)
- {
- u16 id, size;
-
- if (!((is >> id) && (is >> size))) break;
-
- //cerr << 6 << "reading profile with id " << id << " "
- // << size << " bytes long" << endl;
-
- Profile::Header hdr (id, size);
-
- if (id == SN::id)
- {
- m->add (Profile_ptr (new SN (hdr, is)));
- }
- else if (id == Data::id)
- {
- m->add (Profile_ptr (new Data (hdr, is)));
- }
- else if (id == NAK::id)
- {
- m->add (Profile_ptr (new NAK (hdr, is)));
- }
- else if (id == NRTM::id)
- {
- m->add (Profile_ptr (new NRTM (hdr, is)));
- }
- else if (id == NoData::id)
- {
- m->add (Profile_ptr (new NoData (hdr, is)));
- }
- else if (id == Part::id)
- {
- m->add (Profile_ptr (new Part (hdr, is)));
- }
- else
- {
- //cerr << 0 << "unknown profile id " << hdr.id () << endl;
- abort ();
- }
- }
-
- in_->recv (m);
- }
- }
-
- ACE_THR_FUNC_RETURN Link::
- recv_thunk (void* obj)
- {
- reinterpret_cast<Link*> (obj)->recv ();
- return 0;
- }
-
- void Link::
- recv (Message_ptr)
- {
- abort ();
- }
-}