diff options
author | Ossama Othman <ossama-othman@users.noreply.github.com> | 2005-02-08 08:33:44 +0000 |
---|---|---|
committer | Ossama Othman <ossama-othman@users.noreply.github.com> | 2005-02-08 08:33:44 +0000 |
commit | ab82f89dc8fde2725888b2577b6c44f113d3040f (patch) | |
tree | 8a9759f3be81fe4eba3a3d42e82609af907c60a4 /protocols | |
parent | 1b01c3c7544d1454ced15a8eae026f96758102a8 (diff) | |
download | ATCD-ab82f89dc8fde2725888b2577b6c44f113d3040f.tar.gz |
ChangeLogTag:Tue Feb 8 00:22:48 2005 Ossama Othman <ossama@dre.vanderbilt.edu>unlabeled-1.1.2
Diffstat (limited to 'protocols')
-rw-r--r-- | protocols/ace/RMCast/Acknowledge.cpp | 315 | ||||
-rw-r--r-- | protocols/ace/RMCast/Acknowledge.h | 234 | ||||
-rw-r--r-- | protocols/ace/RMCast/Agent.tar.bz2 | bin | 0 -> 8046 bytes | |||
-rw-r--r-- | protocols/ace/RMCast/Bits.h | 28 | ||||
-rw-r--r-- | protocols/ace/RMCast/Link.cpp | 271 | ||||
-rw-r--r-- | protocols/ace/RMCast/Link.h | 62 | ||||
-rw-r--r-- | protocols/ace/RMCast/Protocol.cpp | 9 | ||||
-rw-r--r-- | protocols/ace/RMCast/Protocol.h | 715 | ||||
-rw-r--r-- | protocols/ace/RMCast/RMCast.mpc | 7 | ||||
-rw-r--r-- | protocols/ace/RMCast/Retransmit.cpp | 132 | ||||
-rw-r--r-- | protocols/ace/RMCast/Retransmit.h | 97 | ||||
-rw-r--r-- | protocols/ace/RMCast/Simulator.cpp | 40 | ||||
-rw-r--r-- | protocols/ace/RMCast/Simulator.h | 30 | ||||
-rw-r--r-- | protocols/ace/RMCast/Socket.cpp | 112 | ||||
-rw-r--r-- | protocols/ace/RMCast/Stack.cpp | 9 | ||||
-rw-r--r-- | protocols/ace/RMCast/Stack.h | 87 |
16 files changed, 2148 insertions, 0 deletions
diff --git a/protocols/ace/RMCast/Acknowledge.cpp b/protocols/ace/RMCast/Acknowledge.cpp new file mode 100644 index 00000000000..e0d7ce6f376 --- /dev/null +++ b/protocols/ace/RMCast/Acknowledge.cpp @@ -0,0 +1,315 @@ +// file : ace/RMCast/Acknowledge.cpp +// author : Boris Kolpackov <boris@kolpackov.net> +// cvs-id : $Id$ + +#include <ace/OS.h> + +#include <ace/RMCast/Acknowledge.h> + +namespace ACE_RMCast +{ + ACE_Time_Value const tick (0, 5000); + unsigned long const nak_timeout = 20; // # of ticks. + unsigned long const nrtm_timeout = 50; // # of ticks. + + Acknowledge:: + Acknowledge () + : nrtm_timer_ (nrtm_timeout) + { + } + + void Acknowledge:: + in_start (In_Element* in) + { + Element::in_start (in); + } + + void Acknowledge:: + out_start (Out_Element* out) + { + Element::out_start (out); + + tracker_mgr_.spawn (track_thunk, this); + } + + void Acknowledge:: + out_stop () + { + tracker_mgr_.cancel_all (1); + tracker_mgr_.wait (); + + Element::out_stop (); + } + + void Acknowledge:: + collapse (Queue& q) + { + // I would normally use iterators in the logic below but ACE_Map_Manager + // iterates over entries in no particular order so it is pretty much + // unusable here. Instead we will do slow and cumbersome find's. + // + + u64 sn (q.sn () + 1); + + for (;; ++sn) + { + Queue::ENTRY* e; + + if (q.find (sn, e) == -1 || e->int_id_.lost ()) break; + + Message_ptr m (e->int_id_.msg ()); + q.unbind (sn); + + in_->recv (m); + } + + q.sn (sn - 1); + } + + void Acknowledge:: + track () + { + while (true) + { + Messages msgs; + + { + Lock l (mutex_); + + if (hold_.current_size () != 0) + { + for (Map::iterator i (hold_.begin ()), e (hold_.end ()); + i != e; + ++i) + { + Queue& q ((*i).int_id_); + + if (q.current_size () == 0) continue; + + track_queue ((*i).ext_id_, q, msgs); + } + } + + if (--nrtm_timer_ == 0) + { + nrtm_timer_ = nrtm_timeout; + + // Send NRTM. + // + Profile_ptr nrtm (create_nrtm ()); + + if (nrtm.get ()) + { + Message_ptr m (new Message); + m->add (nrtm); + msgs.push_back (m); + + } + } + } + + // Send stuff off. + // + for (Messages::Iterator i (msgs); !i.done (); i.advance ()) + { + Message_ptr* ppm; + i.next (ppm); + send (*ppm); + } + + ACE_OS::sleep (tick); + } + } + + void Acknowledge:: + track_queue (Address const& addr, Queue& q, Messages& msgs) + { + NAK_ptr nak (new NAK (addr)); + + // Track existing losses. + // + for (Queue::iterator i (q.begin ()), e (q.end ()); i != e; ++i) + { + u64 sn ((*i).ext_id_); + Descr& d ((*i).int_id_); + + if (d.lost ()) + { + d.timer (d.timer () - 1); + + if (d.timer () == 0) + { + //@@ Need exp fallback. + // + d.nak_count (d.nak_count () + 1); + d.timer ((d.nak_count () + 1) * nak_timeout); + + nak->add (sn); + + //cerr << 6 << "NAK # " << d.nak_count () << ": " + // << addr << " " << sn << endl; + } + } + } + + // Send NAK. + // + if (nak->count ()) + { + // cerr << 5 << "NAK: " << addr << " " << nak->count () << " sns" + // << endl; + + Message_ptr m (new Message); + + m->add (Profile_ptr (nak.release ())); + + msgs.push_back (m); + } + + // Detect and record new losses. + // + for (u64 sn (q.sn () + 1), end (q.max_sn ()); sn < end; ++sn) + { + if (q.find (sn) == -1) + { + q.bind (sn, Descr (1)); + } + } + } + + void Acknowledge:: + recv (Message_ptr m) + { + // Handle NRTM. There could be some nasty interaction with code + // that handles data below (like missing message and NAK). This + // is why I hold the lock at the beginning (which may be not very + // efficient). + // + Lock l (mutex_); + + if (NRTM const* nrtm = static_cast<NRTM const*> (m->find (NRTM::id))) + { + for (Map::iterator i (hold_.begin ()), e (hold_.end ()); i != e; ++i) + { + u64 sn (nrtm->find ((*i).ext_id_)); + + if (sn != 0) + { + Queue& q ((*i).int_id_); + + u64 old (q.max_sn ()); + + if (old < sn) + { + // Mark as lost. + // + q.bind (sn, Descr (1)); + } + } + } + } + + if (m->find (Data::id)) + { + Address from ( + static_cast<From const*> (m->find (From::id))->address ()); + + u64 sn (static_cast<SN const*> (m->find (SN::id))->num ()); + + Map::ENTRY* e; + + if (hold_.find (from, e) == -1) + { + // First message from this source. + // + hold_.bind (from, Queue (sn)); + hold_.find (from, e); + + in_->recv (m); + } + else + { + Queue& q (e->int_id_); + + if (sn <= q.sn ()) + { + // Duplicate. + // + //cerr << 6 << "DUP " << from << " " << q.sn () << " >= " << sn + // << endl; + } + else if (sn == q.sn () + 1) + { + // Next message. + // + + q.rebind (sn, Descr (m)); + collapse (q); + } + else + { + // Some messages are missing. Insert this one into the queue. + // + q.rebind (sn, Descr (m)); + } + } + } + else + { + l.release (); + + // Just forward it up. + // + in_->recv (m); + } + } + + void Acknowledge:: + send (Message_ptr m) + { + if (m->find (Data::id) != 0) + { + Lock l (mutex_); + + Profile_ptr nrtm (create_nrtm ()); + + if (nrtm.get ()) m->add (nrtm); + + nrtm_timer_ = nrtm_timeout; // Reset timer. + } + + out_->send (m); + } + + Profile_ptr Acknowledge:: + create_nrtm () + { + // Prepare NRTM. + // + NRTM_ptr nrtm (new NRTM ()); + + // Gather the information. + // + { + for (Map::iterator i (hold_.begin ()), e (hold_.end ()); i != e; ++i) + { + Address addr ((*i).ext_id_); + Queue& q ((*i).int_id_); + + //@@ Should look for the highest known number. + // + nrtm->insert (addr, q.sn ()); + } + } + + if (nrtm->empty ()) return 0; + else return Profile_ptr (nrtm.release ()); + } + + ACE_THR_FUNC_RETURN Acknowledge:: + track_thunk (void* obj) + { + reinterpret_cast<Acknowledge*> (obj)->track (); + return 0; + } +} diff --git a/protocols/ace/RMCast/Acknowledge.h b/protocols/ace/RMCast/Acknowledge.h new file mode 100644 index 00000000000..4289ec729db --- /dev/null +++ b/protocols/ace/RMCast/Acknowledge.h @@ -0,0 +1,234 @@ +// file : ace/RMCast/Acknowledge.h +// author : Boris Kolpackov <boris@kolpackov.net> +// cvs-id : $Id$ + +#ifndef ACE_RMCAST_ACKNOWLEDGE_H +#define ACE_RMCAST_ACKNOWLEDGE_H + +#include <ace/Hash_Map_Manager.h> +#include <ace/Thread_Manager.h> + +#include <ace/RMCast/Stack.h> +#include <ace/RMCast/Protocol.h> +#include <ace/RMCast/Bits.h> + +namespace ACE_RMCast +{ + class Acknowledge : public Element + { + public: + Acknowledge (); + + virtual void + in_start (In_Element* in); + + virtual void + out_start (Out_Element* out); + + virtual void + out_stop (); + + public: + virtual void + recv (Message_ptr m); + + virtual void + send (Message_ptr m); + + private: + struct Descr + { + //@@ There should be no default c-tor. + // + Descr () + : nak_count_ (0), timer_ (1) + { + } + + Descr (unsigned long timer) + : nak_count_ (0), timer_ (timer) + { + } + + Descr (Message_ptr m) + : m_ (m) + { + } + + public: + bool + lost () const + { + return m_.get () == 0; + } + + public: + Message_ptr + msg () + { + return m_; + } + + void + msg (Message_ptr m) + { + m_ = m; + } + + public: + unsigned long + nak_count () const + { + return nak_count_; + } + + void + nak_count (unsigned long v) + { + nak_count_ = v; + } + + unsigned long + timer () const + { + return timer_; + } + + void + timer (unsigned long v) + { + timer_ = v; + } + + private: + Message_ptr m_; + + unsigned long nak_count_; + unsigned long timer_; + }; + + struct Queue : ACE_Hash_Map_Manager<u64, Descr, ACE_Null_Mutex> + { + typedef ACE_Hash_Map_Manager<u64, Descr, ACE_Null_Mutex> Base; + + // Should never be here but required by ACE_Hash_Blah_Blah. + // + Queue () + : Base (), sn_ (0), max_sn_ (0) + { + } + + Queue (u64 sn) + : Base (), sn_ (sn), max_sn_ (sn) + { + } + + Queue (Queue const& q) + : Base (), sn_ (q.sn_), max_sn_ (sn_) + { + for (Queue::const_iterator i (q), e (q, 1); i != e; ++i) + { + bind ((*i).ext_id_, (*i).int_id_); + } + } + + public: + int + bind (u64 sn, Descr const& d) + { + int r (Base::bind (sn, d)); + + if (r == 0 && sn > max_sn_) max_sn_ = sn; + + return r; + } + + int + rebind (u64 sn, Descr const& d) + { + int r (Base::rebind (sn, d)); + + if (r == 0 && sn > max_sn_) max_sn_ = sn; + + return r; + } + + int + unbind (u64 sn) + { + int r (Base::unbind (sn)); + + if (r == 0 && sn == max_sn_) + { + for (--max_sn_; max_sn_ >= sn_; --max_sn_) + { + if (find (max_sn_) == 0) break; + } + } + + return r; + } + + public: + u64 + sn () const + { + return sn_; + } + + void + sn (u64 sn) + { + sn_ = sn; + } + + u64 + max_sn () const + { + if (current_size () == 0) return sn_; + + return max_sn_; + } + + private: + u64 sn_, max_sn_; + }; + + typedef + ACE_Hash_Map_Manager_Ex<Address, + Queue, + AddressHasher, + ACE_Equal_To<Address>, + ACE_Null_Mutex> + Map; + + private: + void + collapse (Queue& q); + + void + track (); + + void + track_queue (Address const& addr, Queue& q, Messages& msgs); + + Profile_ptr + create_nrtm (); + + static ACE_THR_FUNC_RETURN + track_thunk (void* obj); + + private: + Map hold_; + Mutex mutex_; + + unsigned long nrtm_timer_; + + ACE_Thread_Manager tracker_mgr_; + }; + + + +} + +#endif // ACE_RMCAST_ACKNOWLEDGE_H diff --git a/protocols/ace/RMCast/Agent.tar.bz2 b/protocols/ace/RMCast/Agent.tar.bz2 Binary files differnew file mode 100644 index 00000000000..173776420d9 --- /dev/null +++ b/protocols/ace/RMCast/Agent.tar.bz2 diff --git a/protocols/ace/RMCast/Bits.h b/protocols/ace/RMCast/Bits.h new file mode 100644 index 00000000000..0c1090910ce --- /dev/null +++ b/protocols/ace/RMCast/Bits.h @@ -0,0 +1,28 @@ +// file : ace/RMCast/Bits.h +// author : Boris Kolpackov <boris@kolpackov.net> +// cvs-id : $Id$ + +#ifndef ACE_RMCAST_BITS_H +#define ACE_RMCAST_BITS_H + +#include <ace/Synch.h> +#include <ace/Auto_Ptr.h> + +//#include <iostream> + +namespace ACE_RMCast +{ + typedef ACE_Thread_Mutex Mutex; + typedef ACE_Guard<Mutex> Lock; + typedef ACE_Condition<Mutex> Condition; + + using ::auto_ptr; // ACE auto_ptr. + + // tmp + // + //using std::cerr; + //using std::endl; +} + + +#endif // ACE_RMCAST_BITS_H diff --git a/protocols/ace/RMCast/Link.cpp b/protocols/ace/RMCast/Link.cpp new file mode 100644 index 00000000000..e6898cfd76c --- /dev/null +++ b/protocols/ace/RMCast/Link.cpp @@ -0,0 +1,271 @@ +// file : ace/RMCast/Link.cpp +// author : Boris Kolpackov <boris@kolpackov.net> +// cvs-id : $Id$ + +#include <ace/RMCast/Link.h> + +namespace ACE_RMCast +{ + Link:: + Link (Address const& addr) + : addr_ (addr), + ssock_ (Address ((unsigned short) (0), INADDR_ANY), + AF_INET, + IPPROTO_UDP, + 1) + + { + srand (time (0)); + + + rsock_.set_option (IP_MULTICAST_LOOP, 0); + + // Set recv/send buffers. + // + { + int r (131070); + int s (sizeof (r)); + + static_cast<ACE_SOCK&> (rsock_).set_option ( + SOL_SOCKET, SO_RCVBUF, &r, s); + + static_cast<ACE_SOCK&> (ssock_).set_option ( + SOL_SOCKET, SO_RCVBUF, &r, s); + + rsock_.get_option (SOL_SOCKET, SO_RCVBUF, &r, &s); + //cerr << 5 << "recv buffer size: " << r << endl; + + ssock_.get_option (SOL_SOCKET, SO_RCVBUF, &r, &s); + //cerr << 5 << "send buffer size: " << r << endl; + + } + + // Bind address and port. + // + if (ACE_OS::connect (ssock_.get_handle (), + reinterpret_cast<sockaddr*> (addr_.get_addr ()), + addr_.get_addr_size ()) == -1) + { + perror ("connect: "); + abort (); + } + + + ssock_.get_local_addr (self_); + + //cerr << 5 << "self: " << self_ << endl; + } + + void Link:: + in_start (In_Element* in) + { + Element::in_start (in); + + rsock_.join (addr_); + + // Start receiving thread. + // + recv_mgr_.spawn (recv_thunk, this); + } + + void Link:: + out_start (Out_Element* out) + { + Element::out_start (out); + } + + void Link:: + in_stop () + { + // Stop receiving thread. + // + recv_mgr_.cancel_all (1); + recv_mgr_.wait (); + + Element::in_stop (); + } + + void Link:: + send (Message_ptr m) + { + bool const sim = false; + + // Simulate message loss and reordering. + // + if (sim) + { + if ((rand () % 5) != 0) + { + Lock l (mutex_); + + if (hold_.get ()) + { + send_ (m); + send_ (hold_); + hold_ = 0; + } + else + { + hold_ = m; + + // Make a copy in M so that the reliable loop below + // won't add FROM and TO to HOLD_. + // + m = Message_ptr (new Message (*hold_)); + } + } + } + else + send_ (m); + + // Reliable loop. + // + m->add (Profile_ptr (new From (self_))); + m->add (Profile_ptr (new To (self_))); + + in_->recv (m); + } + + void Link:: + send_ (Message_ptr m) + { + ostream os (m->size (), 1); // Always little-endian. + + os << *m; + + ssock_.send (os.buffer (), os.length (), addr_); + + /* + if (m->find (nrtm::id)) + { + write (1, os.buffer (), os.length ()); + exit (1); + } + */ + } + + void Link:: + recv () + { + // I could have used ACE_Data_Block but it does not support + // resizing... + // + size_t size (0), capacity (8192); + char* data (reinterpret_cast<char*> (operator new (capacity))); + + auto_ptr<char> holder (data); // This is wicked. + + while (true) + { + //@@ Should I lock here? + // + + Address addr; + + //@@ CDR-specific. + // + size = rsock_.recv (data, 4, addr, MSG_PEEK); + + if (size != 4 || addr == self_) + { + // Discard bad messages and ones from ourselvs since + // we are using reliable loopback. + // + rsock_.recv (data, 0, addr); + continue; + } + + u32 msg_size; + { + istream is (data, size, 1); // Always little-endian. + is >> msg_size; + } + + if (msg_size <= 4) + { + // Bad message. + // + rsock_.recv (data, 0, addr); + continue; + } + + if (capacity < msg_size) + { + capacity = msg_size; + holder = auto_ptr<char> (0); + data = reinterpret_cast<char*> (operator new (capacity)); + holder = auto_ptr<char> (data); + } + + size = rsock_.recv (data, capacity, addr); + + if (msg_size != size) + { + // Bad message. + // + continue; + } + + //cerr << 6 << "from: " << addr << endl; + + Message_ptr m (new Message ()); + + m->add (Profile_ptr (new From (addr))); + m->add (Profile_ptr (new To (self_))); + + istream is (data, size, 1); // Always little-endian. + + is >> msg_size; + + while (true) + { + u16 id, size; + + if (!((is >> id) && (is >> size))) break; + + //cerr << 6 << "reading profile with id " << id << " " + // << size << " bytes long" << endl; + + Profile::Header hdr (id, size); + + switch (hdr.id ()) + { + case SN::id: + { + m->add (Profile_ptr (new SN (hdr, is))); + break; + } + case Data::id: + { + m->add (Profile_ptr (new Data (hdr, is))); + break; + } + case NAK::id: + { + m->add (Profile_ptr (new NAK (hdr, is))); + break; + } + case NRTM::id: + { + m->add (Profile_ptr (new NRTM (hdr, is))); + break; + } + default: + { + //cerr << 0 << "unknown profile id " << hdr.id () << endl; + abort (); + } + } + } + + in_->recv (m); + } + } + + ACE_THR_FUNC_RETURN Link:: + recv_thunk (void* obj) + { + reinterpret_cast<Link*> (obj)->recv (); + return 0; + } +} diff --git a/protocols/ace/RMCast/Link.h b/protocols/ace/RMCast/Link.h new file mode 100644 index 00000000000..e6e66bbbd80 --- /dev/null +++ b/protocols/ace/RMCast/Link.h @@ -0,0 +1,62 @@ +// file : ace/RMCast/Link.h +// author : Boris Kolpackov <boris@kolpackov.net> +// cvs-id : $Id$ + +#ifndef ACE_RMCAST_LINK_H +#define ACE_RMCAST_LINK_H + +#include <ace/SOCK_Dgram.h> +#include <ace/SOCK_Dgram_Mcast.h> + +#include <ace/Thread_Manager.h> + +#include <ace/RMCast/Stack.h> +#include <ace/RMCast/Protocol.h> + +namespace ACE_RMCast +{ + class Link : public Element + { + public: + Link (Address const& addr); + + virtual void + in_start (In_Element* in); + + virtual void + out_start (Out_Element* out); + + virtual void + in_stop (); + + public: + virtual void + send (Message_ptr m); + + private: + virtual void + send_ (Message_ptr m); + + private: + void + recv (); + + static ACE_THR_FUNC_RETURN + recv_thunk (void* obj); + + private: + Address addr_, self_; + ACE_SOCK_Dgram_Mcast rsock_; + ACE_SOCK_Dgram ssock_; + + ACE_Thread_Manager recv_mgr_; + + // Simulator. + // + Message_ptr hold_; + Mutex mutex_; + }; +} + + +#endif // ACE_RMCAST_LINK_H diff --git a/protocols/ace/RMCast/Protocol.cpp b/protocols/ace/RMCast/Protocol.cpp new file mode 100644 index 00000000000..4082d6bd139 --- /dev/null +++ b/protocols/ace/RMCast/Protocol.cpp @@ -0,0 +1,9 @@ +// file : ace/RMCast/Protocol.cpp +// author : Boris Kolpackov <boris@kolpackov.net> +// cvs-id : $Id$ + +#include <ace/RMCast/Protocol.h> + +namespace ACE_RMCast +{ +} diff --git a/protocols/ace/RMCast/Protocol.h b/protocols/ace/RMCast/Protocol.h new file mode 100644 index 00000000000..f9f2956920f --- /dev/null +++ b/protocols/ace/RMCast/Protocol.h @@ -0,0 +1,715 @@ +// file : ace/RMCast/Protocol.h +// author : Boris Kolpackov <boris@kolpackov.net> +// cvs-id : $Id$ + +#ifndef ACE_RMCAST_PROTOCOL_H +#define ACE_RMCAST_PROTOCOL_H + +#include <ace/Refcounted_Auto_Ptr.h> + +#include <ace/Vector_T.h> +#include <ace/Hash_Map_Manager.h> + +#include <ace/OS.h> +#include <ace/CDR_Stream.h> +#include <ace/INET_Addr.h> + +#include <ace/RMCast/Bits.h> + +namespace ACE_RMCast +{ + // Basic types. + // + typedef ACE_CDR::UShort u16; + typedef ACE_CDR::ULong u32; + typedef ACE_CDR::ULongLong u64; + + typedef ACE_INET_Addr Address; + + struct AddressHasher + { + unsigned long + operator() (Address const& a) const + { + unsigned long port (a.get_port_number ()); + unsigned long ip (a.get_ip_address ()); + + port <<= sizeof (sizeof (unsigned long) - sizeof (unsigned short)); + + return port ^ ip; + } + }; + + //@@ Provide stream<< (Address const&) + // + + typedef ACE_OutputCDR ostream; + typedef ACE_InputCDR istream; + + struct Profile; + + typedef + ACE_Refcounted_Auto_Ptr<Profile, ACE_Null_Mutex> + Profile_ptr; + + struct Profile + { + public: + class Header + { + public: + Header (u16 id, u16 size) + : id_ (id), size_ (size) + { + } + + Header (istream& is) + { + is >> id_ >> size_; + } + + public: + u16 + id () const + { + return id_; + } + + u16 + size () const + { + return size_; + } + + protected: + void + size (u16 s) + { + size_ = s; + } + + friend class Profile; + + private: + u16 id_; + u16 size_; + }; + + public: + virtual + ~Profile () + { + } + + protected: + Profile (u16 id, u16 size, u16 boundary) + : header_ (id, size), boundary_ (boundary) + { + } + + Profile (Header const& h, u16 boundary) + : header_ (h), boundary_ (boundary) + { + } + public: + u16 + id () const + { + return header_.id (); + } + + u16 + size () const + { + return header_.size (); + } + + u16 + boundary () const + { + return boundary_; + } + + protected: + void + size (u16 s) + { + header_.size (s); + } + + public: + virtual void + serialize_body (ostream&) const = 0; + + friend + ostream& + operator<< (ostream& os, Profile const& p); + + private: + Header header_; + u16 boundary_; + }; + + inline + ostream& + operator<< (ostream& os, Profile::Header const& hdr) + { + os << hdr.id (); + os << hdr.size (); + + return os; + } + + inline + ostream& + operator<< (ostream& os, Profile const& p) + { + os << p.header_; + p.serialize_body (os); + + return os; + } + + // + // + // + + class Message; + + typedef + ACE_Refcounted_Auto_Ptr<Message, ACE_Null_Mutex> + Message_ptr; + + class Message + { + public: + Message () + : profiles_ (4) + { + } + + Message (Message const& m) + : profiles_ (4) + { + for (Profiles::const_iterator i (m.profiles_); !i.done (); i.advance ()) + { + // Shallow copy of profiles. + // + profiles_.bind ((*i).ext_id_, (*i).int_id_); + } + } + + public: + struct duplicate {}; + + void + add (Profile_ptr p) + { + u16 id (p->id ()); + + if (profiles_.find (id) == 0) + { + throw duplicate (); + } + + profiles_.bind (id, p); + } + + Profile const* + find (u16 id) const + { + Profiles::ENTRY* e; + + if (profiles_.find (id, e) == -1) return 0; + + return e->int_id_.get (); + } + + public: + size_t + size () const + { + size_t s (4); // 4 is for size (u32) + + for (Profiles::const_iterator i (profiles_); !i.done (); i.advance ()) + { + //@@ This is so broken: in CDR the padding depends on + // what comes after. + // + s += s % 2; // Padding to the boundary of 2. + s += 4; // Profile header: u16 + u16 + s += s % (*i).int_id_->boundary (); // Padding to the b. of profile body. + s += (*i).int_id_->size (); // Profile body. + } + + return s; + } + + friend + ostream& + operator<< (ostream& os, Message const& m) + { + u32 s (m.size ()); + + os << s; + + for (Profiles::const_iterator i (m.profiles_); !i.done (); i.advance ()) + { + os << *((*i).int_id_); + } + + return os; + } + + typedef + ACE_Hash_Map_Manager<u16, Profile_ptr, ACE_Null_Mutex> + Profiles; + + Profiles profiles_; + }; + + typedef + ACE_Vector<Message_ptr> + Messages; + + + // + // + // + struct From; + + typedef + ACE_Refcounted_Auto_Ptr<From, ACE_Null_Mutex> + From_ptr; + + struct From : Profile + { + static u16 const id = 0x0001; + + public: + From (Header const& h, istream& is) + : Profile (h, 4) + { + u32 addr; + u16 port; + + is >> addr; + is >> port; + + address_ = Address (port, addr); + } + + // 6 is CDR-specific. + // + From (Address const& addr) + : Profile (id, 6, 4), address_ (addr) + { + } + + public: + Address const& + address () const + { + return address_; + } + + public: + virtual void + serialize_body (ostream& os) const + { + u32 addr (address_.get_ip_address ()); + u16 port (address_.get_port_number ()); + + os << addr; + os << port; + } + + private: + Address address_; + }; + + + // + // + // + struct To; + + typedef + ACE_Refcounted_Auto_Ptr<To, ACE_Null_Mutex> + To_ptr; + + struct To : Profile + { + static u16 const id = 0x0002; + + public: + To (Header const& h, istream& is) + : Profile (h, 4) + { + u32 addr; + u16 port; + + is >> addr; + is >> port; + + address_ = Address (port, addr); + } + + // 6 is CDR-specific. + // + To (Address const& addr) + : Profile (id, 6, 4), address_ (addr) + { + } + + public: + Address const& + address () const + { + return address_; + } + + public: + virtual void + serialize_body (ostream& os) const + { + u32 addr (address_.get_ip_address ()); + u16 port (address_.get_port_number ()); + + os << addr; + os << port; + } + + private: + Address address_; + }; + + // + // + // + struct Data; + + typedef + ACE_Refcounted_Auto_Ptr<Data, ACE_Null_Mutex> + Data_ptr; + + struct Data : Profile + { + static u16 const id = 0x0003; + + public: + Data (Header const& h, istream& is) + : Profile (h, 1), buf_ (0), size_ (h.size ()) + { + if (size_) + { + buf_ = reinterpret_cast<char*> (operator new (size_)); + is.read_char_array (buf_, size_); + } + + } + + Data (void const* buf, size_t s) + : Profile (id, s, 1), buf_ (0), size_ (s) + { + if (size_) + { + buf_ = reinterpret_cast<char*> (operator new (size_)); + ACE_OS::memcpy (buf_, buf, size_); + } + } + + public: + char const* + buf () const + { + return buf_; + } + + size_t + size () const + { + return size_; + } + + public: + virtual void + serialize_body (ostream& os) const + { + os.write_char_array (buf_, size_); + } + + private: + char* buf_; + size_t size_; + }; + + // + // + // + struct SN; + + typedef + ACE_Refcounted_Auto_Ptr<SN, ACE_Null_Mutex> + SN_ptr; + + struct SN : Profile + { + static u16 const id = 0x0004; + + public: + SN (Header const& h, istream& is) + : Profile (h, 8) + { + is >> n_; + } + + // 8 is CDR-specific. + // + SN (u64 n) + : Profile (id, 8, 8), n_ (n) + { + } + + public: + u64 + num () const + { + return n_; + } + + public: + virtual void + serialize_body (ostream& os) const + { + os << n_; + } + + private: + u64 n_; + }; + + + // + // + // + struct NAK; + + typedef + ACE_Refcounted_Auto_Ptr<NAK, ACE_Null_Mutex> + NAK_ptr; + + struct NAK : Profile + { + static u16 const id = 0x0005; + + typedef + ACE_Vector<u64> + SerialNumbers; + + typedef + SerialNumbers::Iterator + iterator; + + public: + NAK (Header const& h, istream& is) + : Profile (h, 8) + { + //@@ All the numbers are CDR-specific. + // + // 8 = u32 + u16 + 2(padding to u64) + // + for (long i (0); i < ((h.size () - 8) / 8); ++i) + { + u64 sn; + is >> sn; + sns_.push_back (sn); + } + + u32 addr; + u16 port; + + is >> port; + is >> addr; + + address_ = Address (port, addr); + } + + // 8 is CDR-specific. + // + NAK (Address const& src) + : Profile (id, 8, 8), address_ (src) + { + } + + public: + void + add (u64 sn) + { + sns_.push_back (sn); + size (size () + 8); //@@ 8 is CDR-specific + } + + public: + Address const& + address () const + { + return address_; + } + + + iterator + begin () /* const */ + { + return iterator (sns_); + } + + /* + iterator + end () const + { + return sns_.end (); + } + */ + + size_t + count () const + { + return sns_.size (); + } + + public: + virtual void + serialize_body (ostream& os) const + { + NAK& this_ (const_cast<NAK&> (*this)); // Don't put in ROM. + + // Stone age iteration. + // + for (iterator i (this_.begin ()); !i.done (); i.advance ()) + { + u64* psn; + i.next (psn); + os << *psn; + } + + + u32 addr (address_.get_ip_address ()); + u16 port (address_.get_port_number ()); + + os << port; + os << addr; + } + + private: + Address address_; + SerialNumbers sns_; + }; + + // + // + // + struct NRTM; + + typedef + ACE_Refcounted_Auto_Ptr<NRTM, ACE_Null_Mutex> + NRTM_ptr; + + struct NRTM : Profile + { + static u16 const id = 0x0006; + + public: + NRTM (Header const& h, istream& is) + : Profile (h, 8), map_ (10) + { + //@@ 16 is CDR-specific. + // + // 16 = u32 + u16 + 2(padding to u64) + u64 + for (u16 i (0); i < (h.size () / 16); ++i) + { + u32 addr; + u16 port; + u64 sn; + + is >> sn; + is >> port; + is >> addr; + + map_.bind (Address (port, addr), sn); + } + } + + NRTM () + : Profile (id, 0, 8), map_ (10) + { + } + + public: + void + insert (Address const& addr, u64 sn) + { + map_.bind (addr, sn); + + size (size () + 16); //@@ 16 is CDR-specific. + } + + u64 + find (Address const& addr) const + { + u64 sn; + + if (map_.find (addr, sn) == -1) return 0; + + return sn; + } + + bool + empty () const + { + return map_.current_size () == 0; + } + + public: + virtual void + serialize_body (ostream& os) const + { + for (Map::const_iterator i (map_), e (map_, 1); i != e; ++i) + { + u32 addr ((*i).ext_id_.get_ip_address ()); + u16 port ((*i).ext_id_.get_port_number ()); + u64 sn ((*i).int_id_); + + os << sn; + os << port; + os << addr; + } + } + + private: + typedef + ACE_Hash_Map_Manager_Ex<Address, + u64, + AddressHasher, + ACE_Equal_To<Address>, + ACE_Null_Mutex> + Map; + + Map map_; + }; + +} + +/* +inline +std::ostream& +operator<< (std::ostream& os, RMCast::Address const& a) +{ + char buf[64]; + a.addr_to_string (buf, 64, 1); + return os << buf; +} +*/ + + +#endif // ACE_RMCAST_PROTOCOL_H diff --git a/protocols/ace/RMCast/RMCast.mpc b/protocols/ace/RMCast/RMCast.mpc new file mode 100644 index 00000000000..5ba1bd21b2d --- /dev/null +++ b/protocols/ace/RMCast/RMCast.mpc @@ -0,0 +1,7 @@ +// -*- MPC -*- +// $Id$ + +project(RMCast) : acelib, core { + sharedname = ACE_RMCast + dynamicflags = ACE_RMCAST_BUILD_DLL +} diff --git a/protocols/ace/RMCast/Retransmit.cpp b/protocols/ace/RMCast/Retransmit.cpp new file mode 100644 index 00000000000..e04cec2f689 --- /dev/null +++ b/protocols/ace/RMCast/Retransmit.cpp @@ -0,0 +1,132 @@ +// file : ace/RMCast/Retransmit.cpp +// author : Boris Kolpackov <boris@kolpackov.net> +// cvs-id : $Id$ + +#include <ace/OS.h> + +#include <ace/RMCast/Retransmit.h> + +namespace ACE_RMCast +{ + ACE_Time_Value const tick (0, 50000); + + Retransmit:: + Retransmit () + { + } + + void Retransmit:: + out_start (Out_Element* out) + { + Element::out_start (out); + + tracker_mgr_.spawn (track_thunk, this); + } + + void Retransmit:: + out_stop () + { + tracker_mgr_.cancel_all (1); + tracker_mgr_.wait (); + + Element::out_stop (); + } + + void Retransmit:: + send (Message_ptr m) + { + if (Data const* data = static_cast<Data const*> (m->find (Data::id))) + { + u64 sn (static_cast<SN const*> (m->find (SN::id))->num ()); + + Lock l (mutex_); + + queue_.bind (sn, Descr (new Data (*data))); + } + + out_->send (m); + } + + void Retransmit:: + recv (Message_ptr m) + { + if (NAK const* nak = static_cast<NAK const*> (m->find (NAK::id))) + { + Address to (static_cast<To const*> (m->find (To::id))->address ()); + + if (nak->address () == to) + { + Lock l (mutex_); + + for (NAK::iterator j (const_cast<NAK*> (nak)->begin ()); + !j.done (); + j.advance ()) + { + u64* psn; + j.next (psn); + + + Queue::ENTRY* pair; + + if (queue_.find (*psn, pair) == 0) + { + //cerr << 5 << "PRTM " << to << " " << pair->ext_id_ << endl; + + Message_ptr m (new Message); + + m->add (Profile_ptr (new SN (pair->ext_id_))); + m->add (pair->int_id_.data ()); + + pair->int_id_.reset (); + + out_->send (m); + } + else + { + //@@ TODO: message aging + // + //cerr << 4 << "message " << *psn << " not available" << endl; + } + } + } + } + else + { + in_->recv (m); + } + } + + ACE_THR_FUNC_RETURN Retransmit:: + track_thunk (void* obj) + { + reinterpret_cast<Retransmit*> (obj)->track (); + return 0; + } + + void Retransmit:: + track () + { + while (true) + { + { + Lock l (mutex_); + + for (Queue::iterator i (queue_); !i.done ();) + { + if ((*i).int_id_.inc () >= 60) + { + u64 sn ((*i).ext_id_); + i.advance (); + queue_.unbind (sn); + } + else + { + i.advance (); + } + } + } + + ACE_OS::sleep (tick); + } + } +} diff --git a/protocols/ace/RMCast/Retransmit.h b/protocols/ace/RMCast/Retransmit.h new file mode 100644 index 00000000000..6348ba66009 --- /dev/null +++ b/protocols/ace/RMCast/Retransmit.h @@ -0,0 +1,97 @@ +// file : ace/RMCast/Retransmit.h +// author : Boris Kolpackov <boris@kolpackov.net> +// cvs-id : $Id$ + +#ifndef ACE_RMCAST_RETRANSMIT_H +#define ACE_RMCAST_RETRANSMIT_H + +#include <ace/Hash_Map_Manager.h> + +#include <ace/Thread_Manager.h> + +#include <ace/RMCast/Stack.h> +#include <ace/RMCast/Protocol.h> +#include <ace/RMCast/Bits.h> + +namespace ACE_RMCast +{ + class Retransmit : public Element + { + public: + Retransmit (); + + virtual void + out_start (Out_Element* out); + + virtual void + out_stop (); + + public: + virtual void + send (Message_ptr m); + + virtual void + recv (Message_ptr m); + + private: + struct Descr + { + // Shouldn't be available but ACE_Hash_Map needs it. + // + Descr () + : data_ (), count_ (0) + { + } + + Descr (Data_ptr d) + : data_ (d), count_ (0) + { + } + + unsigned long + inc () + { + return ++count_; + } + + void + reset () + { + count_ = 0; + } + + // It would be logical to return data_ptr but ACE ref_auto_ptr + // hasn't learned how to convert between pointers yet. + // + Profile_ptr + data () const + { + return Profile_ptr (new Data (*data_)); + } + + private: + Data_ptr data_; + unsigned long count_; + }; + + typedef + ACE_Hash_Map_Manager<u64, Descr, ACE_Null_Mutex> + Queue; + + private: + void + track (); + + static ACE_THR_FUNC_RETURN + track_thunk (void* obj); + + private: + Queue queue_; + Mutex mutex_; + + ACE_Thread_Manager tracker_mgr_; + }; +} + + +#endif // ACE_RMCAST_RETRANSMIT_H diff --git a/protocols/ace/RMCast/Simulator.cpp b/protocols/ace/RMCast/Simulator.cpp new file mode 100644 index 00000000000..4b08f37f3da --- /dev/null +++ b/protocols/ace/RMCast/Simulator.cpp @@ -0,0 +1,40 @@ +// file : ace/RMCast/Simulator.cpp +// author : Boris Kolpackov <boris@kolpackov.net> +// cvs-id : $Id$ + +#include <ace/RMCast/Simulator.h> + +namespace ACE_RMCast +{ + Simulator:: + Simulator () + { + srand (time (0)); + } + + void Simulator:: + send (Message_ptr m) + { + // Note: Simulator may work in unpredictable ways mainly due + // to the "reliable loopback" mechanism. + // + out_->send (m); + return; + + int r (rand ()); + + if ((r % 3) == 0) return; + + Lock l (mutex_); + + if (hold_.get ()) + { + out_->send (m); + out_->send (hold_); + } + else + { + hold_ = m; + } + } +} diff --git a/protocols/ace/RMCast/Simulator.h b/protocols/ace/RMCast/Simulator.h new file mode 100644 index 00000000000..9e19f26bb8b --- /dev/null +++ b/protocols/ace/RMCast/Simulator.h @@ -0,0 +1,30 @@ +// file : ace/RMCast/Simulator.h +// author : Boris Kolpackov <boris@kolpackov.net> +// cvs-id : $Id$ + +#ifndef ACE_RMCAST_SIMULATOR_H +#define ACE_RMCAST_SIMULATOR_H + +#include <ace/RMCast/Stack.h> +#include <ace/RMCast/Protocol.h> +#include <ace/RMCast/Bits.h> + +namespace ACE_RMCast +{ + class Simulator : public Element + { + public: + Simulator (); + + public: + virtual void + send (Message_ptr m); + + private: + Message_ptr hold_; + Mutex mutex_; + }; +} + + +#endif // ACE_RMCAST_SIMULATOR_H diff --git a/protocols/ace/RMCast/Socket.cpp b/protocols/ace/RMCast/Socket.cpp new file mode 100644 index 00000000000..640a1a7d694 --- /dev/null +++ b/protocols/ace/RMCast/Socket.cpp @@ -0,0 +1,112 @@ +// file : ace/RMCast/Socket.cpp +// author : Boris Kolpackov <boris@kolpackov.net> +// cvs-id : $Id$ + +#include <ace/OS.h> + +#include <ace/RMCast/Socket.h> + +namespace ACE_RMCast +{ + Socket:: + Socket (Address const& a, bool loop) + : loop_ (loop), sn_ (1), cond_ (mutex_) + { + acknowledge_ = auto_ptr<Acknowledge> (new Acknowledge ()); + retransmit_ = auto_ptr<Retransmit> (new Retransmit ()); + simulator_ = auto_ptr<Simulator> (new Simulator ()); + link_ = auto_ptr<Link> (new Link (a)); + + // Start IN stack from top to bottom. + // + in_start (0); + acknowledge_->in_start (this); + retransmit_->in_start (acknowledge_.get ()); + simulator_->in_start (retransmit_.get ()); + link_->in_start (simulator_.get ()); + + // Start OUT stack from bottom up. + // + link_->out_start (0); + simulator_->out_start (link_.get ()); + retransmit_->out_start (simulator_.get ()); + acknowledge_->out_start (retransmit_.get ()); + out_start (acknowledge_.get ()); + } + + Socket:: + ~Socket () + { + // Stop OUT stack from top to bottom. + // + out_stop (); + acknowledge_->out_stop (); + retransmit_->out_stop (); + simulator_->out_stop (); + link_->out_stop (); + + // Stop IN stack from bottom up. + // + link_->in_stop (); + simulator_->in_stop (); + retransmit_->in_stop (); + acknowledge_->in_stop (); + in_stop (); + } + + + void Socket:: + send (void const* buf, size_t s) + { + Message_ptr m (new Message); + + m->add (Profile_ptr (new SN (sn_++))); + m->add (Profile_ptr (new Data (buf, s))); + + send (m); + } + + size_t Socket:: + recv (void* buf, size_t s) + { + Lock l (mutex_); + + while (queue_.is_empty ()) cond_.wait (); + + Message_ptr m; + if (queue_.dequeue_head (m) == -1) abort (); + + Data const* d (static_cast<Data const*>(m->find (Data::id))); + + size_t r (d->size () < s ? d->size () : s); + + ACE_OS::memcpy (buf, d->buf (), r); + + return r; + } + + void Socket:: + recv (Message_ptr m) + { + if (m->find (Data::id) != 0) + { + if (!loop_) + { + Address to (static_cast<To const*> (m->find (To::id))->address ()); + + Address from ( + static_cast<From const*> (m->find (From::id))->address ()); + + if (to == from) return; + } + + Lock l (mutex_); + + bool signal (queue_.is_empty ()); + + queue_.enqueue_tail (m); + + if (signal) cond_.signal (); + } + } +} diff --git a/protocols/ace/RMCast/Stack.cpp b/protocols/ace/RMCast/Stack.cpp new file mode 100644 index 00000000000..729e289cf0e --- /dev/null +++ b/protocols/ace/RMCast/Stack.cpp @@ -0,0 +1,9 @@ +// file : ace/RMCast/Stack.cpp +// author : Boris Kolpackov <boris@kolpackov.net> +// cvs-id : $Id$ + +#include <ace/RMCast/Stack.h> + +namespace ACE_RMCast +{ +} diff --git a/protocols/ace/RMCast/Stack.h b/protocols/ace/RMCast/Stack.h new file mode 100644 index 00000000000..e371370977d --- /dev/null +++ b/protocols/ace/RMCast/Stack.h @@ -0,0 +1,87 @@ +// file : ace/RMCast/Stack.h +// author : Boris Kolpackov <boris@kolpackov.net> +// cvs-id : $Id$ + +#ifndef ACE_RMCAST_STACK_H +#define ACE_RMCAST_STACK_H + +#include <ace/RMCast/Protocol.h> + +namespace ACE_RMCast +{ + struct Out_Element + { + virtual + ~Out_Element () + { + } + + Out_Element () + : out_ (0) + { + } + + virtual void + out_start (Out_Element* out) + { + out_ = out; + } + + virtual void + send (Message_ptr m) + { + if (out_) out_->send (m); + } + + virtual void + out_stop () + { + out_ = 0; + } + + protected: + Out_Element* out_; + }; + + + struct In_Element + { + virtual + ~In_Element () + { + } + + In_Element () + : in_ (0) + { + } + + virtual void + in_start (In_Element* in) + { + in_ = in; + } + + virtual void + recv (Message_ptr m) + { + if (in_) in_->recv (m); + } + + virtual void + in_stop () + { + in_ = 0; + } + + protected: + In_Element* in_; + }; + + struct Element : In_Element, Out_Element + { + }; + +} + +#endif // ACE_RMCAST_STACK_H |