summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorOssama Othman <ossama-othman@users.noreply.github.com>2005-02-08 08:33:44 +0000
committerOssama Othman <ossama-othman@users.noreply.github.com>2005-02-08 08:33:44 +0000
commitab82f89dc8fde2725888b2577b6c44f113d3040f (patch)
tree8a9759f3be81fe4eba3a3d42e82609af907c60a4
parent1b01c3c7544d1454ced15a8eae026f96758102a8 (diff)
downloadATCD-unlabeled-1.1.2.tar.gz
ChangeLogTag:Tue Feb 8 00:22:48 2005 Ossama Othman <ossama@dre.vanderbilt.edu>unlabeled-1.1.2
-rw-r--r--protocols/ace/RMCast/Acknowledge.cpp315
-rw-r--r--protocols/ace/RMCast/Acknowledge.h234
-rw-r--r--protocols/ace/RMCast/Agent.tar.bz2bin0 -> 8046 bytes
-rw-r--r--protocols/ace/RMCast/Bits.h28
-rw-r--r--protocols/ace/RMCast/Link.cpp271
-rw-r--r--protocols/ace/RMCast/Link.h62
-rw-r--r--protocols/ace/RMCast/Protocol.cpp9
-rw-r--r--protocols/ace/RMCast/Protocol.h715
-rw-r--r--protocols/ace/RMCast/RMCast.mpc7
-rw-r--r--protocols/ace/RMCast/Retransmit.cpp132
-rw-r--r--protocols/ace/RMCast/Retransmit.h97
-rw-r--r--protocols/ace/RMCast/Simulator.cpp40
-rw-r--r--protocols/ace/RMCast/Simulator.h30
-rw-r--r--protocols/ace/RMCast/Socket.cpp112
-rw-r--r--protocols/ace/RMCast/Stack.cpp9
-rw-r--r--protocols/ace/RMCast/Stack.h87
16 files changed, 2148 insertions, 0 deletions
diff --git a/protocols/ace/RMCast/Acknowledge.cpp b/protocols/ace/RMCast/Acknowledge.cpp
new file mode 100644
index 00000000000..e0d7ce6f376
--- /dev/null
+++ b/protocols/ace/RMCast/Acknowledge.cpp
@@ -0,0 +1,315 @@
+// file : ace/RMCast/Acknowledge.cpp
+// author : Boris Kolpackov <boris@kolpackov.net>
+// cvs-id : $Id$
+
+#include <ace/OS.h>
+
+#include <ace/RMCast/Acknowledge.h>
+
+namespace ACE_RMCast
+{
+ ACE_Time_Value const tick (0, 5000);
+ unsigned long const nak_timeout = 20; // # of ticks.
+ unsigned long const nrtm_timeout = 50; // # of ticks.
+
+ Acknowledge::
+ Acknowledge ()
+ : nrtm_timer_ (nrtm_timeout)
+ {
+ }
+
+ void Acknowledge::
+ in_start (In_Element* in)
+ {
+ Element::in_start (in);
+ }
+
+ void Acknowledge::
+ out_start (Out_Element* out)
+ {
+ Element::out_start (out);
+
+ tracker_mgr_.spawn (track_thunk, this);
+ }
+
+ void Acknowledge::
+ out_stop ()
+ {
+ tracker_mgr_.cancel_all (1);
+ tracker_mgr_.wait ();
+
+ Element::out_stop ();
+ }
+
+ void Acknowledge::
+ collapse (Queue& q)
+ {
+ // I would normally use iterators in the logic below but ACE_Map_Manager
+ // iterates over entries in no particular order so it is pretty much
+ // unusable here. Instead we will do slow and cumbersome find's.
+ //
+
+ u64 sn (q.sn () + 1);
+
+ for (;; ++sn)
+ {
+ Queue::ENTRY* e;
+
+ if (q.find (sn, e) == -1 || e->int_id_.lost ()) break;
+
+ Message_ptr m (e->int_id_.msg ());
+ q.unbind (sn);
+
+ in_->recv (m);
+ }
+
+ q.sn (sn - 1);
+ }
+
+ void Acknowledge::
+ track ()
+ {
+ while (true)
+ {
+ Messages msgs;
+
+ {
+ Lock l (mutex_);
+
+ if (hold_.current_size () != 0)
+ {
+ for (Map::iterator i (hold_.begin ()), e (hold_.end ());
+ i != e;
+ ++i)
+ {
+ Queue& q ((*i).int_id_);
+
+ if (q.current_size () == 0) continue;
+
+ track_queue ((*i).ext_id_, q, msgs);
+ }
+ }
+
+ if (--nrtm_timer_ == 0)
+ {
+ nrtm_timer_ = nrtm_timeout;
+
+ // Send NRTM.
+ //
+ Profile_ptr nrtm (create_nrtm ());
+
+ if (nrtm.get ())
+ {
+ Message_ptr m (new Message);
+ m->add (nrtm);
+ msgs.push_back (m);
+
+ }
+ }
+ }
+
+ // Send stuff off.
+ //
+ for (Messages::Iterator i (msgs); !i.done (); i.advance ())
+ {
+ Message_ptr* ppm;
+ i.next (ppm);
+ send (*ppm);
+ }
+
+ ACE_OS::sleep (tick);
+ }
+ }
+
+ void Acknowledge::
+ track_queue (Address const& addr, Queue& q, Messages& msgs)
+ {
+ NAK_ptr nak (new NAK (addr));
+
+ // Track existing losses.
+ //
+ for (Queue::iterator i (q.begin ()), e (q.end ()); i != e; ++i)
+ {
+ u64 sn ((*i).ext_id_);
+ Descr& d ((*i).int_id_);
+
+ if (d.lost ())
+ {
+ d.timer (d.timer () - 1);
+
+ if (d.timer () == 0)
+ {
+ //@@ Need exp fallback.
+ //
+ d.nak_count (d.nak_count () + 1);
+ d.timer ((d.nak_count () + 1) * nak_timeout);
+
+ nak->add (sn);
+
+ //cerr << 6 << "NAK # " << d.nak_count () << ": "
+ // << addr << " " << sn << endl;
+ }
+ }
+ }
+
+ // Send NAK.
+ //
+ if (nak->count ())
+ {
+ // cerr << 5 << "NAK: " << addr << " " << nak->count () << " sns"
+ // << endl;
+
+ Message_ptr m (new Message);
+
+ m->add (Profile_ptr (nak.release ()));
+
+ msgs.push_back (m);
+ }
+
+ // Detect and record new losses.
+ //
+ for (u64 sn (q.sn () + 1), end (q.max_sn ()); sn < end; ++sn)
+ {
+ if (q.find (sn) == -1)
+ {
+ q.bind (sn, Descr (1));
+ }
+ }
+ }
+
+ void Acknowledge::
+ recv (Message_ptr m)
+ {
+ // Handle NRTM. There could be some nasty interaction with code
+ // that handles data below (like missing message and NAK). This
+ // is why I hold the lock at the beginning (which may be not very
+ // efficient).
+ //
+ Lock l (mutex_);
+
+ if (NRTM const* nrtm = static_cast<NRTM const*> (m->find (NRTM::id)))
+ {
+ for (Map::iterator i (hold_.begin ()), e (hold_.end ()); i != e; ++i)
+ {
+ u64 sn (nrtm->find ((*i).ext_id_));
+
+ if (sn != 0)
+ {
+ Queue& q ((*i).int_id_);
+
+ u64 old (q.max_sn ());
+
+ if (old < sn)
+ {
+ // Mark as lost.
+ //
+ q.bind (sn, Descr (1));
+ }
+ }
+ }
+ }
+
+ if (m->find (Data::id))
+ {
+ Address from (
+ static_cast<From const*> (m->find (From::id))->address ());
+
+ u64 sn (static_cast<SN const*> (m->find (SN::id))->num ());
+
+ Map::ENTRY* e;
+
+ if (hold_.find (from, e) == -1)
+ {
+ // First message from this source.
+ //
+ hold_.bind (from, Queue (sn));
+ hold_.find (from, e);
+
+ in_->recv (m);
+ }
+ else
+ {
+ Queue& q (e->int_id_);
+
+ if (sn <= q.sn ())
+ {
+ // Duplicate.
+ //
+ //cerr << 6 << "DUP " << from << " " << q.sn () << " >= " << sn
+ // << endl;
+ }
+ else if (sn == q.sn () + 1)
+ {
+ // Next message.
+ //
+
+ q.rebind (sn, Descr (m));
+ collapse (q);
+ }
+ else
+ {
+ // Some messages are missing. Insert this one into the queue.
+ //
+ q.rebind (sn, Descr (m));
+ }
+ }
+ }
+ else
+ {
+ l.release ();
+
+ // Just forward it up.
+ //
+ in_->recv (m);
+ }
+ }
+
+ void Acknowledge::
+ send (Message_ptr m)
+ {
+ if (m->find (Data::id) != 0)
+ {
+ Lock l (mutex_);
+
+ Profile_ptr nrtm (create_nrtm ());
+
+ if (nrtm.get ()) m->add (nrtm);
+
+ nrtm_timer_ = nrtm_timeout; // Reset timer.
+ }
+
+ out_->send (m);
+ }
+
+ Profile_ptr Acknowledge::
+ create_nrtm ()
+ {
+ // Prepare NRTM.
+ //
+ NRTM_ptr nrtm (new NRTM ());
+
+ // Gather the information.
+ //
+ {
+ for (Map::iterator i (hold_.begin ()), e (hold_.end ()); i != e; ++i)
+ {
+ Address addr ((*i).ext_id_);
+ Queue& q ((*i).int_id_);
+
+ //@@ Should look for the highest known number.
+ //
+ nrtm->insert (addr, q.sn ());
+ }
+ }
+
+ if (nrtm->empty ()) return 0;
+ else return Profile_ptr (nrtm.release ());
+ }
+
+ ACE_THR_FUNC_RETURN Acknowledge::
+ track_thunk (void* obj)
+ {
+ reinterpret_cast<Acknowledge*> (obj)->track ();
+ return 0;
+ }
+}
diff --git a/protocols/ace/RMCast/Acknowledge.h b/protocols/ace/RMCast/Acknowledge.h
new file mode 100644
index 00000000000..4289ec729db
--- /dev/null
+++ b/protocols/ace/RMCast/Acknowledge.h
@@ -0,0 +1,234 @@
+// file : ace/RMCast/Acknowledge.h
+// author : Boris Kolpackov <boris@kolpackov.net>
+// cvs-id : $Id$
+
+#ifndef ACE_RMCAST_ACKNOWLEDGE_H
+#define ACE_RMCAST_ACKNOWLEDGE_H
+
+#include <ace/Hash_Map_Manager.h>
+#include <ace/Thread_Manager.h>
+
+#include <ace/RMCast/Stack.h>
+#include <ace/RMCast/Protocol.h>
+#include <ace/RMCast/Bits.h>
+
+namespace ACE_RMCast
+{
+ class Acknowledge : public Element
+ {
+ public:
+ Acknowledge ();
+
+ virtual void
+ in_start (In_Element* in);
+
+ virtual void
+ out_start (Out_Element* out);
+
+ virtual void
+ out_stop ();
+
+ public:
+ virtual void
+ recv (Message_ptr m);
+
+ virtual void
+ send (Message_ptr m);
+
+ private:
+ struct Descr
+ {
+ //@@ There should be no default c-tor.
+ //
+ Descr ()
+ : nak_count_ (0), timer_ (1)
+ {
+ }
+
+ Descr (unsigned long timer)
+ : nak_count_ (0), timer_ (timer)
+ {
+ }
+
+ Descr (Message_ptr m)
+ : m_ (m)
+ {
+ }
+
+ public:
+ bool
+ lost () const
+ {
+ return m_.get () == 0;
+ }
+
+ public:
+ Message_ptr
+ msg ()
+ {
+ return m_;
+ }
+
+ void
+ msg (Message_ptr m)
+ {
+ m_ = m;
+ }
+
+ public:
+ unsigned long
+ nak_count () const
+ {
+ return nak_count_;
+ }
+
+ void
+ nak_count (unsigned long v)
+ {
+ nak_count_ = v;
+ }
+
+ unsigned long
+ timer () const
+ {
+ return timer_;
+ }
+
+ void
+ timer (unsigned long v)
+ {
+ timer_ = v;
+ }
+
+ private:
+ Message_ptr m_;
+
+ unsigned long nak_count_;
+ unsigned long timer_;
+ };
+
+ struct Queue : ACE_Hash_Map_Manager<u64, Descr, ACE_Null_Mutex>
+ {
+ typedef ACE_Hash_Map_Manager<u64, Descr, ACE_Null_Mutex> Base;
+
+ // Should never be here but required by ACE_Hash_Blah_Blah.
+ //
+ Queue ()
+ : Base (), sn_ (0), max_sn_ (0)
+ {
+ }
+
+ Queue (u64 sn)
+ : Base (), sn_ (sn), max_sn_ (sn)
+ {
+ }
+
+ Queue (Queue const& q)
+ : Base (), sn_ (q.sn_), max_sn_ (sn_)
+ {
+ for (Queue::const_iterator i (q), e (q, 1); i != e; ++i)
+ {
+ bind ((*i).ext_id_, (*i).int_id_);
+ }
+ }
+
+ public:
+ int
+ bind (u64 sn, Descr const& d)
+ {
+ int r (Base::bind (sn, d));
+
+ if (r == 0 && sn > max_sn_) max_sn_ = sn;
+
+ return r;
+ }
+
+ int
+ rebind (u64 sn, Descr const& d)
+ {
+ int r (Base::rebind (sn, d));
+
+ if (r == 0 && sn > max_sn_) max_sn_ = sn;
+
+ return r;
+ }
+
+ int
+ unbind (u64 sn)
+ {
+ int r (Base::unbind (sn));
+
+ if (r == 0 && sn == max_sn_)
+ {
+ for (--max_sn_; max_sn_ >= sn_; --max_sn_)
+ {
+ if (find (max_sn_) == 0) break;
+ }
+ }
+
+ return r;
+ }
+
+ public:
+ u64
+ sn () const
+ {
+ return sn_;
+ }
+
+ void
+ sn (u64 sn)
+ {
+ sn_ = sn;
+ }
+
+ u64
+ max_sn () const
+ {
+ if (current_size () == 0) return sn_;
+
+ return max_sn_;
+ }
+
+ private:
+ u64 sn_, max_sn_;
+ };
+
+ typedef
+ ACE_Hash_Map_Manager_Ex<Address,
+ Queue,
+ AddressHasher,
+ ACE_Equal_To<Address>,
+ ACE_Null_Mutex>
+ Map;
+
+ private:
+ void
+ collapse (Queue& q);
+
+ void
+ track ();
+
+ void
+ track_queue (Address const& addr, Queue& q, Messages& msgs);
+
+ Profile_ptr
+ create_nrtm ();
+
+ static ACE_THR_FUNC_RETURN
+ track_thunk (void* obj);
+
+ private:
+ Map hold_;
+ Mutex mutex_;
+
+ unsigned long nrtm_timer_;
+
+ ACE_Thread_Manager tracker_mgr_;
+ };
+
+
+
+}
+
+#endif // ACE_RMCAST_ACKNOWLEDGE_H
diff --git a/protocols/ace/RMCast/Agent.tar.bz2 b/protocols/ace/RMCast/Agent.tar.bz2
new file mode 100644
index 00000000000..173776420d9
--- /dev/null
+++ b/protocols/ace/RMCast/Agent.tar.bz2
Binary files differ
diff --git a/protocols/ace/RMCast/Bits.h b/protocols/ace/RMCast/Bits.h
new file mode 100644
index 00000000000..0c1090910ce
--- /dev/null
+++ b/protocols/ace/RMCast/Bits.h
@@ -0,0 +1,28 @@
+// file : ace/RMCast/Bits.h
+// author : Boris Kolpackov <boris@kolpackov.net>
+// cvs-id : $Id$
+
+#ifndef ACE_RMCAST_BITS_H
+#define ACE_RMCAST_BITS_H
+
+#include <ace/Synch.h>
+#include <ace/Auto_Ptr.h>
+
+//#include <iostream>
+
+namespace ACE_RMCast
+{
+ typedef ACE_Thread_Mutex Mutex;
+ typedef ACE_Guard<Mutex> Lock;
+ typedef ACE_Condition<Mutex> Condition;
+
+ using ::auto_ptr; // ACE auto_ptr.
+
+ // tmp
+ //
+ //using std::cerr;
+ //using std::endl;
+}
+
+
+#endif // ACE_RMCAST_BITS_H
diff --git a/protocols/ace/RMCast/Link.cpp b/protocols/ace/RMCast/Link.cpp
new file mode 100644
index 00000000000..e6898cfd76c
--- /dev/null
+++ b/protocols/ace/RMCast/Link.cpp
@@ -0,0 +1,271 @@
+// file : ace/RMCast/Link.cpp
+// author : Boris Kolpackov <boris@kolpackov.net>
+// cvs-id : $Id$
+
+#include <ace/RMCast/Link.h>
+
+namespace ACE_RMCast
+{
+ Link::
+ Link (Address const& addr)
+ : addr_ (addr),
+ ssock_ (Address ((unsigned short) (0), INADDR_ANY),
+ AF_INET,
+ IPPROTO_UDP,
+ 1)
+
+ {
+ srand (time (0));
+
+
+ rsock_.set_option (IP_MULTICAST_LOOP, 0);
+
+ // Set recv/send buffers.
+ //
+ {
+ int r (131070);
+ int s (sizeof (r));
+
+ static_cast<ACE_SOCK&> (rsock_).set_option (
+ SOL_SOCKET, SO_RCVBUF, &r, s);
+
+ static_cast<ACE_SOCK&> (ssock_).set_option (
+ SOL_SOCKET, SO_RCVBUF, &r, s);
+
+ rsock_.get_option (SOL_SOCKET, SO_RCVBUF, &r, &s);
+ //cerr << 5 << "recv buffer size: " << r << endl;
+
+ ssock_.get_option (SOL_SOCKET, SO_RCVBUF, &r, &s);
+ //cerr << 5 << "send buffer size: " << r << endl;
+
+ }
+
+ // Bind address and port.
+ //
+ if (ACE_OS::connect (ssock_.get_handle (),
+ reinterpret_cast<sockaddr*> (addr_.get_addr ()),
+ addr_.get_addr_size ()) == -1)
+ {
+ perror ("connect: ");
+ abort ();
+ }
+
+
+ ssock_.get_local_addr (self_);
+
+ //cerr << 5 << "self: " << self_ << endl;
+ }
+
+ void Link::
+ in_start (In_Element* in)
+ {
+ Element::in_start (in);
+
+ rsock_.join (addr_);
+
+ // Start receiving thread.
+ //
+ recv_mgr_.spawn (recv_thunk, this);
+ }
+
+ void Link::
+ out_start (Out_Element* out)
+ {
+ Element::out_start (out);
+ }
+
+ void Link::
+ in_stop ()
+ {
+ // Stop receiving thread.
+ //
+ recv_mgr_.cancel_all (1);
+ recv_mgr_.wait ();
+
+ Element::in_stop ();
+ }
+
+ void Link::
+ send (Message_ptr m)
+ {
+ bool const sim = false;
+
+ // Simulate message loss and reordering.
+ //
+ if (sim)
+ {
+ if ((rand () % 5) != 0)
+ {
+ Lock l (mutex_);
+
+ if (hold_.get ())
+ {
+ send_ (m);
+ send_ (hold_);
+ hold_ = 0;
+ }
+ else
+ {
+ hold_ = m;
+
+ // Make a copy in M so that the reliable loop below
+ // won't add FROM and TO to HOLD_.
+ //
+ m = Message_ptr (new Message (*hold_));
+ }
+ }
+ }
+ else
+ send_ (m);
+
+ // Reliable loop.
+ //
+ m->add (Profile_ptr (new From (self_)));
+ m->add (Profile_ptr (new To (self_)));
+
+ in_->recv (m);
+ }
+
+ void Link::
+ send_ (Message_ptr m)
+ {
+ ostream os (m->size (), 1); // Always little-endian.
+
+ os << *m;
+
+ ssock_.send (os.buffer (), os.length (), addr_);
+
+ /*
+ if (m->find (nrtm::id))
+ {
+ write (1, os.buffer (), os.length ());
+ exit (1);
+ }
+ */
+ }
+
+ void Link::
+ recv ()
+ {
+ // I could have used ACE_Data_Block but it does not support
+ // resizing...
+ //
+ size_t size (0), capacity (8192);
+ char* data (reinterpret_cast<char*> (operator new (capacity)));
+
+ auto_ptr<char> holder (data); // This is wicked.
+
+ while (true)
+ {
+ //@@ Should I lock here?
+ //
+
+ Address addr;
+
+ //@@ CDR-specific.
+ //
+ size = rsock_.recv (data, 4, addr, MSG_PEEK);
+
+ if (size != 4 || addr == self_)
+ {
+ // Discard bad messages and ones from ourselvs since
+ // we are using reliable loopback.
+ //
+ rsock_.recv (data, 0, addr);
+ continue;
+ }
+
+ u32 msg_size;
+ {
+ istream is (data, size, 1); // Always little-endian.
+ is >> msg_size;
+ }
+
+ if (msg_size <= 4)
+ {
+ // Bad message.
+ //
+ rsock_.recv (data, 0, addr);
+ continue;
+ }
+
+ if (capacity < msg_size)
+ {
+ capacity = msg_size;
+ holder = auto_ptr<char> (0);
+ data = reinterpret_cast<char*> (operator new (capacity));
+ holder = auto_ptr<char> (data);
+ }
+
+ size = rsock_.recv (data, capacity, addr);
+
+ if (msg_size != size)
+ {
+ // Bad message.
+ //
+ continue;
+ }
+
+ //cerr << 6 << "from: " << addr << endl;
+
+ Message_ptr m (new Message ());
+
+ m->add (Profile_ptr (new From (addr)));
+ m->add (Profile_ptr (new To (self_)));
+
+ istream is (data, size, 1); // Always little-endian.
+
+ is >> msg_size;
+
+ while (true)
+ {
+ u16 id, size;
+
+ if (!((is >> id) && (is >> size))) break;
+
+ //cerr << 6 << "reading profile with id " << id << " "
+ // << size << " bytes long" << endl;
+
+ Profile::Header hdr (id, size);
+
+ switch (hdr.id ())
+ {
+ case SN::id:
+ {
+ m->add (Profile_ptr (new SN (hdr, is)));
+ break;
+ }
+ case Data::id:
+ {
+ m->add (Profile_ptr (new Data (hdr, is)));
+ break;
+ }
+ case NAK::id:
+ {
+ m->add (Profile_ptr (new NAK (hdr, is)));
+ break;
+ }
+ case NRTM::id:
+ {
+ m->add (Profile_ptr (new NRTM (hdr, is)));
+ break;
+ }
+ default:
+ {
+ //cerr << 0 << "unknown profile id " << hdr.id () << endl;
+ abort ();
+ }
+ }
+ }
+
+ in_->recv (m);
+ }
+ }
+
+ ACE_THR_FUNC_RETURN Link::
+ recv_thunk (void* obj)
+ {
+ reinterpret_cast<Link*> (obj)->recv ();
+ return 0;
+ }
+}
diff --git a/protocols/ace/RMCast/Link.h b/protocols/ace/RMCast/Link.h
new file mode 100644
index 00000000000..e6e66bbbd80
--- /dev/null
+++ b/protocols/ace/RMCast/Link.h
@@ -0,0 +1,62 @@
+// file : ace/RMCast/Link.h
+// author : Boris Kolpackov <boris@kolpackov.net>
+// cvs-id : $Id$
+
+#ifndef ACE_RMCAST_LINK_H
+#define ACE_RMCAST_LINK_H
+
+#include <ace/SOCK_Dgram.h>
+#include <ace/SOCK_Dgram_Mcast.h>
+
+#include <ace/Thread_Manager.h>
+
+#include <ace/RMCast/Stack.h>
+#include <ace/RMCast/Protocol.h>
+
+namespace ACE_RMCast
+{
+ class Link : public Element
+ {
+ public:
+ Link (Address const& addr);
+
+ virtual void
+ in_start (In_Element* in);
+
+ virtual void
+ out_start (Out_Element* out);
+
+ virtual void
+ in_stop ();
+
+ public:
+ virtual void
+ send (Message_ptr m);
+
+ private:
+ virtual void
+ send_ (Message_ptr m);
+
+ private:
+ void
+ recv ();
+
+ static ACE_THR_FUNC_RETURN
+ recv_thunk (void* obj);
+
+ private:
+ Address addr_, self_;
+ ACE_SOCK_Dgram_Mcast rsock_;
+ ACE_SOCK_Dgram ssock_;
+
+ ACE_Thread_Manager recv_mgr_;
+
+ // Simulator.
+ //
+ Message_ptr hold_;
+ Mutex mutex_;
+ };
+}
+
+
+#endif // ACE_RMCAST_LINK_H
diff --git a/protocols/ace/RMCast/Protocol.cpp b/protocols/ace/RMCast/Protocol.cpp
new file mode 100644
index 00000000000..4082d6bd139
--- /dev/null
+++ b/protocols/ace/RMCast/Protocol.cpp
@@ -0,0 +1,9 @@
+// file : ace/RMCast/Protocol.cpp
+// author : Boris Kolpackov <boris@kolpackov.net>
+// cvs-id : $Id$
+
+#include <ace/RMCast/Protocol.h>
+
+namespace ACE_RMCast
+{
+}
diff --git a/protocols/ace/RMCast/Protocol.h b/protocols/ace/RMCast/Protocol.h
new file mode 100644
index 00000000000..f9f2956920f
--- /dev/null
+++ b/protocols/ace/RMCast/Protocol.h
@@ -0,0 +1,715 @@
+// file : ace/RMCast/Protocol.h
+// author : Boris Kolpackov <boris@kolpackov.net>
+// cvs-id : $Id$
+
+#ifndef ACE_RMCAST_PROTOCOL_H
+#define ACE_RMCAST_PROTOCOL_H
+
+#include <ace/Refcounted_Auto_Ptr.h>
+
+#include <ace/Vector_T.h>
+#include <ace/Hash_Map_Manager.h>
+
+#include <ace/OS.h>
+#include <ace/CDR_Stream.h>
+#include <ace/INET_Addr.h>
+
+#include <ace/RMCast/Bits.h>
+
+namespace ACE_RMCast
+{
+ // Basic types.
+ //
+ typedef ACE_CDR::UShort u16;
+ typedef ACE_CDR::ULong u32;
+ typedef ACE_CDR::ULongLong u64;
+
+ typedef ACE_INET_Addr Address;
+
+ struct AddressHasher
+ {
+ unsigned long
+ operator() (Address const& a) const
+ {
+ unsigned long port (a.get_port_number ());
+ unsigned long ip (a.get_ip_address ());
+
+ port <<= sizeof (sizeof (unsigned long) - sizeof (unsigned short));
+
+ return port ^ ip;
+ }
+ };
+
+ //@@ Provide stream<< (Address const&)
+ //
+
+ typedef ACE_OutputCDR ostream;
+ typedef ACE_InputCDR istream;
+
+ struct Profile;
+
+ typedef
+ ACE_Refcounted_Auto_Ptr<Profile, ACE_Null_Mutex>
+ Profile_ptr;
+
+ struct Profile
+ {
+ public:
+ class Header
+ {
+ public:
+ Header (u16 id, u16 size)
+ : id_ (id), size_ (size)
+ {
+ }
+
+ Header (istream& is)
+ {
+ is >> id_ >> size_;
+ }
+
+ public:
+ u16
+ id () const
+ {
+ return id_;
+ }
+
+ u16
+ size () const
+ {
+ return size_;
+ }
+
+ protected:
+ void
+ size (u16 s)
+ {
+ size_ = s;
+ }
+
+ friend class Profile;
+
+ private:
+ u16 id_;
+ u16 size_;
+ };
+
+ public:
+ virtual
+ ~Profile ()
+ {
+ }
+
+ protected:
+ Profile (u16 id, u16 size, u16 boundary)
+ : header_ (id, size), boundary_ (boundary)
+ {
+ }
+
+ Profile (Header const& h, u16 boundary)
+ : header_ (h), boundary_ (boundary)
+ {
+ }
+ public:
+ u16
+ id () const
+ {
+ return header_.id ();
+ }
+
+ u16
+ size () const
+ {
+ return header_.size ();
+ }
+
+ u16
+ boundary () const
+ {
+ return boundary_;
+ }
+
+ protected:
+ void
+ size (u16 s)
+ {
+ header_.size (s);
+ }
+
+ public:
+ virtual void
+ serialize_body (ostream&) const = 0;
+
+ friend
+ ostream&
+ operator<< (ostream& os, Profile const& p);
+
+ private:
+ Header header_;
+ u16 boundary_;
+ };
+
+ inline
+ ostream&
+ operator<< (ostream& os, Profile::Header const& hdr)
+ {
+ os << hdr.id ();
+ os << hdr.size ();
+
+ return os;
+ }
+
+ inline
+ ostream&
+ operator<< (ostream& os, Profile const& p)
+ {
+ os << p.header_;
+ p.serialize_body (os);
+
+ return os;
+ }
+
+ //
+ //
+ //
+
+ class Message;
+
+ typedef
+ ACE_Refcounted_Auto_Ptr<Message, ACE_Null_Mutex>
+ Message_ptr;
+
+ class Message
+ {
+ public:
+ Message ()
+ : profiles_ (4)
+ {
+ }
+
+ Message (Message const& m)
+ : profiles_ (4)
+ {
+ for (Profiles::const_iterator i (m.profiles_); !i.done (); i.advance ())
+ {
+ // Shallow copy of profiles.
+ //
+ profiles_.bind ((*i).ext_id_, (*i).int_id_);
+ }
+ }
+
+ public:
+ struct duplicate {};
+
+ void
+ add (Profile_ptr p)
+ {
+ u16 id (p->id ());
+
+ if (profiles_.find (id) == 0)
+ {
+ throw duplicate ();
+ }
+
+ profiles_.bind (id, p);
+ }
+
+ Profile const*
+ find (u16 id) const
+ {
+ Profiles::ENTRY* e;
+
+ if (profiles_.find (id, e) == -1) return 0;
+
+ return e->int_id_.get ();
+ }
+
+ public:
+ size_t
+ size () const
+ {
+ size_t s (4); // 4 is for size (u32)
+
+ for (Profiles::const_iterator i (profiles_); !i.done (); i.advance ())
+ {
+ //@@ This is so broken: in CDR the padding depends on
+ // what comes after.
+ //
+ s += s % 2; // Padding to the boundary of 2.
+ s += 4; // Profile header: u16 + u16
+ s += s % (*i).int_id_->boundary (); // Padding to the b. of profile body.
+ s += (*i).int_id_->size (); // Profile body.
+ }
+
+ return s;
+ }
+
+ friend
+ ostream&
+ operator<< (ostream& os, Message const& m)
+ {
+ u32 s (m.size ());
+
+ os << s;
+
+ for (Profiles::const_iterator i (m.profiles_); !i.done (); i.advance ())
+ {
+ os << *((*i).int_id_);
+ }
+
+ return os;
+ }
+
+ typedef
+ ACE_Hash_Map_Manager<u16, Profile_ptr, ACE_Null_Mutex>
+ Profiles;
+
+ Profiles profiles_;
+ };
+
+ typedef
+ ACE_Vector<Message_ptr>
+ Messages;
+
+
+ //
+ //
+ //
+ struct From;
+
+ typedef
+ ACE_Refcounted_Auto_Ptr<From, ACE_Null_Mutex>
+ From_ptr;
+
+ struct From : Profile
+ {
+ static u16 const id = 0x0001;
+
+ public:
+ From (Header const& h, istream& is)
+ : Profile (h, 4)
+ {
+ u32 addr;
+ u16 port;
+
+ is >> addr;
+ is >> port;
+
+ address_ = Address (port, addr);
+ }
+
+ // 6 is CDR-specific.
+ //
+ From (Address const& addr)
+ : Profile (id, 6, 4), address_ (addr)
+ {
+ }
+
+ public:
+ Address const&
+ address () const
+ {
+ return address_;
+ }
+
+ public:
+ virtual void
+ serialize_body (ostream& os) const
+ {
+ u32 addr (address_.get_ip_address ());
+ u16 port (address_.get_port_number ());
+
+ os << addr;
+ os << port;
+ }
+
+ private:
+ Address address_;
+ };
+
+
+ //
+ //
+ //
+ struct To;
+
+ typedef
+ ACE_Refcounted_Auto_Ptr<To, ACE_Null_Mutex>
+ To_ptr;
+
+ struct To : Profile
+ {
+ static u16 const id = 0x0002;
+
+ public:
+ To (Header const& h, istream& is)
+ : Profile (h, 4)
+ {
+ u32 addr;
+ u16 port;
+
+ is >> addr;
+ is >> port;
+
+ address_ = Address (port, addr);
+ }
+
+ // 6 is CDR-specific.
+ //
+ To (Address const& addr)
+ : Profile (id, 6, 4), address_ (addr)
+ {
+ }
+
+ public:
+ Address const&
+ address () const
+ {
+ return address_;
+ }
+
+ public:
+ virtual void
+ serialize_body (ostream& os) const
+ {
+ u32 addr (address_.get_ip_address ());
+ u16 port (address_.get_port_number ());
+
+ os << addr;
+ os << port;
+ }
+
+ private:
+ Address address_;
+ };
+
+ //
+ //
+ //
+ struct Data;
+
+ typedef
+ ACE_Refcounted_Auto_Ptr<Data, ACE_Null_Mutex>
+ Data_ptr;
+
+ struct Data : Profile
+ {
+ static u16 const id = 0x0003;
+
+ public:
+ Data (Header const& h, istream& is)
+ : Profile (h, 1), buf_ (0), size_ (h.size ())
+ {
+ if (size_)
+ {
+ buf_ = reinterpret_cast<char*> (operator new (size_));
+ is.read_char_array (buf_, size_);
+ }
+
+ }
+
+ Data (void const* buf, size_t s)
+ : Profile (id, s, 1), buf_ (0), size_ (s)
+ {
+ if (size_)
+ {
+ buf_ = reinterpret_cast<char*> (operator new (size_));
+ ACE_OS::memcpy (buf_, buf, size_);
+ }
+ }
+
+ public:
+ char const*
+ buf () const
+ {
+ return buf_;
+ }
+
+ size_t
+ size () const
+ {
+ return size_;
+ }
+
+ public:
+ virtual void
+ serialize_body (ostream& os) const
+ {
+ os.write_char_array (buf_, size_);
+ }
+
+ private:
+ char* buf_;
+ size_t size_;
+ };
+
+ //
+ //
+ //
+ struct SN;
+
+ typedef
+ ACE_Refcounted_Auto_Ptr<SN, ACE_Null_Mutex>
+ SN_ptr;
+
+ struct SN : Profile
+ {
+ static u16 const id = 0x0004;
+
+ public:
+ SN (Header const& h, istream& is)
+ : Profile (h, 8)
+ {
+ is >> n_;
+ }
+
+ // 8 is CDR-specific.
+ //
+ SN (u64 n)
+ : Profile (id, 8, 8), n_ (n)
+ {
+ }
+
+ public:
+ u64
+ num () const
+ {
+ return n_;
+ }
+
+ public:
+ virtual void
+ serialize_body (ostream& os) const
+ {
+ os << n_;
+ }
+
+ private:
+ u64 n_;
+ };
+
+
+ //
+ //
+ //
+ struct NAK;
+
+ typedef
+ ACE_Refcounted_Auto_Ptr<NAK, ACE_Null_Mutex>
+ NAK_ptr;
+
+ struct NAK : Profile
+ {
+ static u16 const id = 0x0005;
+
+ typedef
+ ACE_Vector<u64>
+ SerialNumbers;
+
+ typedef
+ SerialNumbers::Iterator
+ iterator;
+
+ public:
+ NAK (Header const& h, istream& is)
+ : Profile (h, 8)
+ {
+ //@@ All the numbers are CDR-specific.
+ //
+ // 8 = u32 + u16 + 2(padding to u64)
+ //
+ for (long i (0); i < ((h.size () - 8) / 8); ++i)
+ {
+ u64 sn;
+ is >> sn;
+ sns_.push_back (sn);
+ }
+
+ u32 addr;
+ u16 port;
+
+ is >> port;
+ is >> addr;
+
+ address_ = Address (port, addr);
+ }
+
+ // 8 is CDR-specific.
+ //
+ NAK (Address const& src)
+ : Profile (id, 8, 8), address_ (src)
+ {
+ }
+
+ public:
+ void
+ add (u64 sn)
+ {
+ sns_.push_back (sn);
+ size (size () + 8); //@@ 8 is CDR-specific
+ }
+
+ public:
+ Address const&
+ address () const
+ {
+ return address_;
+ }
+
+
+ iterator
+ begin () /* const */
+ {
+ return iterator (sns_);
+ }
+
+ /*
+ iterator
+ end () const
+ {
+ return sns_.end ();
+ }
+ */
+
+ size_t
+ count () const
+ {
+ return sns_.size ();
+ }
+
+ public:
+ virtual void
+ serialize_body (ostream& os) const
+ {
+ NAK& this_ (const_cast<NAK&> (*this)); // Don't put in ROM.
+
+ // Stone age iteration.
+ //
+ for (iterator i (this_.begin ()); !i.done (); i.advance ())
+ {
+ u64* psn;
+ i.next (psn);
+ os << *psn;
+ }
+
+
+ u32 addr (address_.get_ip_address ());
+ u16 port (address_.get_port_number ());
+
+ os << port;
+ os << addr;
+ }
+
+ private:
+ Address address_;
+ SerialNumbers sns_;
+ };
+
+ //
+ //
+ //
+ struct NRTM;
+
+ typedef
+ ACE_Refcounted_Auto_Ptr<NRTM, ACE_Null_Mutex>
+ NRTM_ptr;
+
+ struct NRTM : Profile
+ {
+ static u16 const id = 0x0006;
+
+ public:
+ NRTM (Header const& h, istream& is)
+ : Profile (h, 8), map_ (10)
+ {
+ //@@ 16 is CDR-specific.
+ //
+ // 16 = u32 + u16 + 2(padding to u64) + u64
+ for (u16 i (0); i < (h.size () / 16); ++i)
+ {
+ u32 addr;
+ u16 port;
+ u64 sn;
+
+ is >> sn;
+ is >> port;
+ is >> addr;
+
+ map_.bind (Address (port, addr), sn);
+ }
+ }
+
+ NRTM ()
+ : Profile (id, 0, 8), map_ (10)
+ {
+ }
+
+ public:
+ void
+ insert (Address const& addr, u64 sn)
+ {
+ map_.bind (addr, sn);
+
+ size (size () + 16); //@@ 16 is CDR-specific.
+ }
+
+ u64
+ find (Address const& addr) const
+ {
+ u64 sn;
+
+ if (map_.find (addr, sn) == -1) return 0;
+
+ return sn;
+ }
+
+ bool
+ empty () const
+ {
+ return map_.current_size () == 0;
+ }
+
+ public:
+ virtual void
+ serialize_body (ostream& os) const
+ {
+ for (Map::const_iterator i (map_), e (map_, 1); i != e; ++i)
+ {
+ u32 addr ((*i).ext_id_.get_ip_address ());
+ u16 port ((*i).ext_id_.get_port_number ());
+ u64 sn ((*i).int_id_);
+
+ os << sn;
+ os << port;
+ os << addr;
+ }
+ }
+
+ private:
+ typedef
+ ACE_Hash_Map_Manager_Ex<Address,
+ u64,
+ AddressHasher,
+ ACE_Equal_To<Address>,
+ ACE_Null_Mutex>
+ Map;
+
+ Map map_;
+ };
+
+}
+
+/*
+inline
+std::ostream&
+operator<< (std::ostream& os, RMCast::Address const& a)
+{
+ char buf[64];
+ a.addr_to_string (buf, 64, 1);
+ return os << buf;
+}
+*/
+
+
+#endif // ACE_RMCAST_PROTOCOL_H
diff --git a/protocols/ace/RMCast/RMCast.mpc b/protocols/ace/RMCast/RMCast.mpc
new file mode 100644
index 00000000000..5ba1bd21b2d
--- /dev/null
+++ b/protocols/ace/RMCast/RMCast.mpc
@@ -0,0 +1,7 @@
+// -*- MPC -*-
+// $Id$
+
+project(RMCast) : acelib, core {
+ sharedname = ACE_RMCast
+ dynamicflags = ACE_RMCAST_BUILD_DLL
+}
diff --git a/protocols/ace/RMCast/Retransmit.cpp b/protocols/ace/RMCast/Retransmit.cpp
new file mode 100644
index 00000000000..e04cec2f689
--- /dev/null
+++ b/protocols/ace/RMCast/Retransmit.cpp
@@ -0,0 +1,132 @@
+// file : ace/RMCast/Retransmit.cpp
+// author : Boris Kolpackov <boris@kolpackov.net>
+// cvs-id : $Id$
+
+#include <ace/OS.h>
+
+#include <ace/RMCast/Retransmit.h>
+
+namespace ACE_RMCast
+{
+ ACE_Time_Value const tick (0, 50000);
+
+ Retransmit::
+ Retransmit ()
+ {
+ }
+
+ void Retransmit::
+ out_start (Out_Element* out)
+ {
+ Element::out_start (out);
+
+ tracker_mgr_.spawn (track_thunk, this);
+ }
+
+ void Retransmit::
+ out_stop ()
+ {
+ tracker_mgr_.cancel_all (1);
+ tracker_mgr_.wait ();
+
+ Element::out_stop ();
+ }
+
+ void Retransmit::
+ send (Message_ptr m)
+ {
+ if (Data const* data = static_cast<Data const*> (m->find (Data::id)))
+ {
+ u64 sn (static_cast<SN const*> (m->find (SN::id))->num ());
+
+ Lock l (mutex_);
+
+ queue_.bind (sn, Descr (new Data (*data)));
+ }
+
+ out_->send (m);
+ }
+
+ void Retransmit::
+ recv (Message_ptr m)
+ {
+ if (NAK const* nak = static_cast<NAK const*> (m->find (NAK::id)))
+ {
+ Address to (static_cast<To const*> (m->find (To::id))->address ());
+
+ if (nak->address () == to)
+ {
+ Lock l (mutex_);
+
+ for (NAK::iterator j (const_cast<NAK*> (nak)->begin ());
+ !j.done ();
+ j.advance ())
+ {
+ u64* psn;
+ j.next (psn);
+
+
+ Queue::ENTRY* pair;
+
+ if (queue_.find (*psn, pair) == 0)
+ {
+ //cerr << 5 << "PRTM " << to << " " << pair->ext_id_ << endl;
+
+ Message_ptr m (new Message);
+
+ m->add (Profile_ptr (new SN (pair->ext_id_)));
+ m->add (pair->int_id_.data ());
+
+ pair->int_id_.reset ();
+
+ out_->send (m);
+ }
+ else
+ {
+ //@@ TODO: message aging
+ //
+ //cerr << 4 << "message " << *psn << " not available" << endl;
+ }
+ }
+ }
+ }
+ else
+ {
+ in_->recv (m);
+ }
+ }
+
+ ACE_THR_FUNC_RETURN Retransmit::
+ track_thunk (void* obj)
+ {
+ reinterpret_cast<Retransmit*> (obj)->track ();
+ return 0;
+ }
+
+ void Retransmit::
+ track ()
+ {
+ while (true)
+ {
+ {
+ Lock l (mutex_);
+
+ for (Queue::iterator i (queue_); !i.done ();)
+ {
+ if ((*i).int_id_.inc () >= 60)
+ {
+ u64 sn ((*i).ext_id_);
+ i.advance ();
+ queue_.unbind (sn);
+ }
+ else
+ {
+ i.advance ();
+ }
+ }
+ }
+
+ ACE_OS::sleep (tick);
+ }
+ }
+}
diff --git a/protocols/ace/RMCast/Retransmit.h b/protocols/ace/RMCast/Retransmit.h
new file mode 100644
index 00000000000..6348ba66009
--- /dev/null
+++ b/protocols/ace/RMCast/Retransmit.h
@@ -0,0 +1,97 @@
+// file : ace/RMCast/Retransmit.h
+// author : Boris Kolpackov <boris@kolpackov.net>
+// cvs-id : $Id$
+
+#ifndef ACE_RMCAST_RETRANSMIT_H
+#define ACE_RMCAST_RETRANSMIT_H
+
+#include <ace/Hash_Map_Manager.h>
+
+#include <ace/Thread_Manager.h>
+
+#include <ace/RMCast/Stack.h>
+#include <ace/RMCast/Protocol.h>
+#include <ace/RMCast/Bits.h>
+
+namespace ACE_RMCast
+{
+ class Retransmit : public Element
+ {
+ public:
+ Retransmit ();
+
+ virtual void
+ out_start (Out_Element* out);
+
+ virtual void
+ out_stop ();
+
+ public:
+ virtual void
+ send (Message_ptr m);
+
+ virtual void
+ recv (Message_ptr m);
+
+ private:
+ struct Descr
+ {
+ // Shouldn't be available but ACE_Hash_Map needs it.
+ //
+ Descr ()
+ : data_ (), count_ (0)
+ {
+ }
+
+ Descr (Data_ptr d)
+ : data_ (d), count_ (0)
+ {
+ }
+
+ unsigned long
+ inc ()
+ {
+ return ++count_;
+ }
+
+ void
+ reset ()
+ {
+ count_ = 0;
+ }
+
+ // It would be logical to return data_ptr but ACE ref_auto_ptr
+ // hasn't learned how to convert between pointers yet.
+ //
+ Profile_ptr
+ data () const
+ {
+ return Profile_ptr (new Data (*data_));
+ }
+
+ private:
+ Data_ptr data_;
+ unsigned long count_;
+ };
+
+ typedef
+ ACE_Hash_Map_Manager<u64, Descr, ACE_Null_Mutex>
+ Queue;
+
+ private:
+ void
+ track ();
+
+ static ACE_THR_FUNC_RETURN
+ track_thunk (void* obj);
+
+ private:
+ Queue queue_;
+ Mutex mutex_;
+
+ ACE_Thread_Manager tracker_mgr_;
+ };
+}
+
+
+#endif // ACE_RMCAST_RETRANSMIT_H
diff --git a/protocols/ace/RMCast/Simulator.cpp b/protocols/ace/RMCast/Simulator.cpp
new file mode 100644
index 00000000000..4b08f37f3da
--- /dev/null
+++ b/protocols/ace/RMCast/Simulator.cpp
@@ -0,0 +1,40 @@
+// file : ace/RMCast/Simulator.cpp
+// author : Boris Kolpackov <boris@kolpackov.net>
+// cvs-id : $Id$
+
+#include <ace/RMCast/Simulator.h>
+
+namespace ACE_RMCast
+{
+ Simulator::
+ Simulator ()
+ {
+ srand (time (0));
+ }
+
+ void Simulator::
+ send (Message_ptr m)
+ {
+ // Note: Simulator may work in unpredictable ways mainly due
+ // to the "reliable loopback" mechanism.
+ //
+ out_->send (m);
+ return;
+
+ int r (rand ());
+
+ if ((r % 3) == 0) return;
+
+ Lock l (mutex_);
+
+ if (hold_.get ())
+ {
+ out_->send (m);
+ out_->send (hold_);
+ }
+ else
+ {
+ hold_ = m;
+ }
+ }
+}
diff --git a/protocols/ace/RMCast/Simulator.h b/protocols/ace/RMCast/Simulator.h
new file mode 100644
index 00000000000..9e19f26bb8b
--- /dev/null
+++ b/protocols/ace/RMCast/Simulator.h
@@ -0,0 +1,30 @@
+// file : ace/RMCast/Simulator.h
+// author : Boris Kolpackov <boris@kolpackov.net>
+// cvs-id : $Id$
+
+#ifndef ACE_RMCAST_SIMULATOR_H
+#define ACE_RMCAST_SIMULATOR_H
+
+#include <ace/RMCast/Stack.h>
+#include <ace/RMCast/Protocol.h>
+#include <ace/RMCast/Bits.h>
+
+namespace ACE_RMCast
+{
+ class Simulator : public Element
+ {
+ public:
+ Simulator ();
+
+ public:
+ virtual void
+ send (Message_ptr m);
+
+ private:
+ Message_ptr hold_;
+ Mutex mutex_;
+ };
+}
+
+
+#endif // ACE_RMCAST_SIMULATOR_H
diff --git a/protocols/ace/RMCast/Socket.cpp b/protocols/ace/RMCast/Socket.cpp
new file mode 100644
index 00000000000..640a1a7d694
--- /dev/null
+++ b/protocols/ace/RMCast/Socket.cpp
@@ -0,0 +1,112 @@
+// file : ace/RMCast/Socket.cpp
+// author : Boris Kolpackov <boris@kolpackov.net>
+// cvs-id : $Id$
+
+#include <ace/OS.h>
+
+#include <ace/RMCast/Socket.h>
+
+namespace ACE_RMCast
+{
+ Socket::
+ Socket (Address const& a, bool loop)
+ : loop_ (loop), sn_ (1), cond_ (mutex_)
+ {
+ acknowledge_ = auto_ptr<Acknowledge> (new Acknowledge ());
+ retransmit_ = auto_ptr<Retransmit> (new Retransmit ());
+ simulator_ = auto_ptr<Simulator> (new Simulator ());
+ link_ = auto_ptr<Link> (new Link (a));
+
+ // Start IN stack from top to bottom.
+ //
+ in_start (0);
+ acknowledge_->in_start (this);
+ retransmit_->in_start (acknowledge_.get ());
+ simulator_->in_start (retransmit_.get ());
+ link_->in_start (simulator_.get ());
+
+ // Start OUT stack from bottom up.
+ //
+ link_->out_start (0);
+ simulator_->out_start (link_.get ());
+ retransmit_->out_start (simulator_.get ());
+ acknowledge_->out_start (retransmit_.get ());
+ out_start (acknowledge_.get ());
+ }
+
+ Socket::
+ ~Socket ()
+ {
+ // Stop OUT stack from top to bottom.
+ //
+ out_stop ();
+ acknowledge_->out_stop ();
+ retransmit_->out_stop ();
+ simulator_->out_stop ();
+ link_->out_stop ();
+
+ // Stop IN stack from bottom up.
+ //
+ link_->in_stop ();
+ simulator_->in_stop ();
+ retransmit_->in_stop ();
+ acknowledge_->in_stop ();
+ in_stop ();
+ }
+
+
+ void Socket::
+ send (void const* buf, size_t s)
+ {
+ Message_ptr m (new Message);
+
+ m->add (Profile_ptr (new SN (sn_++)));
+ m->add (Profile_ptr (new Data (buf, s)));
+
+ send (m);
+ }
+
+ size_t Socket::
+ recv (void* buf, size_t s)
+ {
+ Lock l (mutex_);
+
+ while (queue_.is_empty ()) cond_.wait ();
+
+ Message_ptr m;
+ if (queue_.dequeue_head (m) == -1) abort ();
+
+ Data const* d (static_cast<Data const*>(m->find (Data::id)));
+
+ size_t r (d->size () < s ? d->size () : s);
+
+ ACE_OS::memcpy (buf, d->buf (), r);
+
+ return r;
+ }
+
+ void Socket::
+ recv (Message_ptr m)
+ {
+ if (m->find (Data::id) != 0)
+ {
+ if (!loop_)
+ {
+ Address to (static_cast<To const*> (m->find (To::id))->address ());
+
+ Address from (
+ static_cast<From const*> (m->find (From::id))->address ());
+
+ if (to == from) return;
+ }
+
+ Lock l (mutex_);
+
+ bool signal (queue_.is_empty ());
+
+ queue_.enqueue_tail (m);
+
+ if (signal) cond_.signal ();
+ }
+ }
+}
diff --git a/protocols/ace/RMCast/Stack.cpp b/protocols/ace/RMCast/Stack.cpp
new file mode 100644
index 00000000000..729e289cf0e
--- /dev/null
+++ b/protocols/ace/RMCast/Stack.cpp
@@ -0,0 +1,9 @@
+// file : ace/RMCast/Stack.cpp
+// author : Boris Kolpackov <boris@kolpackov.net>
+// cvs-id : $Id$
+
+#include <ace/RMCast/Stack.h>
+
+namespace ACE_RMCast
+{
+}
diff --git a/protocols/ace/RMCast/Stack.h b/protocols/ace/RMCast/Stack.h
new file mode 100644
index 00000000000..e371370977d
--- /dev/null
+++ b/protocols/ace/RMCast/Stack.h
@@ -0,0 +1,87 @@
+// file : ace/RMCast/Stack.h
+// author : Boris Kolpackov <boris@kolpackov.net>
+// cvs-id : $Id$
+
+#ifndef ACE_RMCAST_STACK_H
+#define ACE_RMCAST_STACK_H
+
+#include <ace/RMCast/Protocol.h>
+
+namespace ACE_RMCast
+{
+ struct Out_Element
+ {
+ virtual
+ ~Out_Element ()
+ {
+ }
+
+ Out_Element ()
+ : out_ (0)
+ {
+ }
+
+ virtual void
+ out_start (Out_Element* out)
+ {
+ out_ = out;
+ }
+
+ virtual void
+ send (Message_ptr m)
+ {
+ if (out_) out_->send (m);
+ }
+
+ virtual void
+ out_stop ()
+ {
+ out_ = 0;
+ }
+
+ protected:
+ Out_Element* out_;
+ };
+
+
+ struct In_Element
+ {
+ virtual
+ ~In_Element ()
+ {
+ }
+
+ In_Element ()
+ : in_ (0)
+ {
+ }
+
+ virtual void
+ in_start (In_Element* in)
+ {
+ in_ = in;
+ }
+
+ virtual void
+ recv (Message_ptr m)
+ {
+ if (in_) in_->recv (m);
+ }
+
+ virtual void
+ in_stop ()
+ {
+ in_ = 0;
+ }
+
+ protected:
+ In_Element* in_;
+ };
+
+ struct Element : In_Element, Out_Element
+ {
+ };
+
+}
+
+#endif // ACE_RMCAST_STACK_H