diff options
author | boris <boris@ae88bc3d-4319-0410-8dbf-d08b4c9d3795> | 2005-06-17 15:36:33 +0000 |
---|---|---|
committer | boris <boris@ae88bc3d-4319-0410-8dbf-d08b4c9d3795> | 2005-06-17 15:36:33 +0000 |
commit | 881ae35060cbb9b85f17de8dd7a63e36a7791fc3 (patch) | |
tree | 19cdd4bc92934d16e4268e17a64488cbac2dc79c /protocols | |
parent | a9c5fe413e5d703a989f840f940ba083dc894d22 (diff) | |
download | ATCD-881ae35060cbb9b85f17de8dd7a63e36a7791fc3.tar.gz |
ChangeLogTag:Fri Jun 17 17:22:13 2005 Boris Kolpackov <boris@kolpackov.net>
Diffstat (limited to 'protocols')
-rw-r--r-- | protocols/ace/RMCast/Acknowledge.cpp | 110 | ||||
-rw-r--r-- | protocols/ace/RMCast/Acknowledge.h | 9 | ||||
-rw-r--r-- | protocols/ace/RMCast/Fragment.cpp | 59 | ||||
-rw-r--r-- | protocols/ace/RMCast/Fragment.h | 25 | ||||
-rw-r--r-- | protocols/ace/RMCast/Link.cpp | 33 | ||||
-rw-r--r-- | protocols/ace/RMCast/Protocol.cpp | 1 | ||||
-rw-r--r-- | protocols/ace/RMCast/Protocol.h | 435 | ||||
-rw-r--r-- | protocols/ace/RMCast/Reassemble.cpp | 119 | ||||
-rw-r--r-- | protocols/ace/RMCast/Reassemble.h | 39 | ||||
-rw-r--r-- | protocols/ace/RMCast/Retransmit.cpp | 18 | ||||
-rw-r--r-- | protocols/ace/RMCast/Retransmit.h | 23 | ||||
-rw-r--r-- | protocols/ace/RMCast/Simulator.h | 4 | ||||
-rw-r--r-- | protocols/ace/RMCast/Socket.cpp | 38 | ||||
-rw-r--r-- | protocols/ace/RMCast/Socket.h | 2 | ||||
-rw-r--r-- | protocols/examples/RMCast/Send_Msg/Protocol.h | 2 | ||||
-rw-r--r-- | protocols/tests/RMCast/Protocol.h | 2 | ||||
-rw-r--r-- | protocols/tests/RMCast/Sender.cpp | 2 |
17 files changed, 801 insertions, 120 deletions
diff --git a/protocols/ace/RMCast/Acknowledge.cpp b/protocols/ace/RMCast/Acknowledge.cpp index 12a857f1c53..1056d6a3281 100644 --- a/protocols/ace/RMCast/Acknowledge.cpp +++ b/protocols/ace/RMCast/Acknowledge.cpp @@ -2,12 +2,18 @@ // author : Boris Kolpackov <boris@kolpackov.net> // cvs-id : $Id$ -#include <ace/Time_Value.h> // ACE_Time_Value -#include <ace/OS_NS_unistd.h> -#include <ace/OS_NS_sys_time.h> // gettimeofday +#include "ace/Time_Value.h" // ACE_Time_Value +#include "ace/OS_NS_unistd.h" +#include "ace/OS_NS_sys_time.h" // gettimeofday #include "Acknowledge.h" +/* +#include <iostream> +using std::cerr; +using std::endl; +*/ + namespace ACE_RMCast { ACE_Time_Value const tick (0, 5000); @@ -108,7 +114,9 @@ namespace ACE_RMCast // Send NRTM. // - Profile_ptr nrtm (create_nrtm ()); + u32 max_elem (NRTM::max_count (max_payload_size)); + + Profile_ptr nrtm (create_nrtm (max_elem)); if (nrtm.get ()) { @@ -157,48 +165,66 @@ namespace ACE_RMCast void Acknowledge:: track_queue (Address const& addr, Queue& q, Messages& msgs) { - NAK_ptr nak (new NAK (addr)); + u32 max_elem (NAK::max_count (max_payload_size)); + u32 count (0); + + Queue::iterator i (q.begin ()), e (q.end ()); // Track existing losses. // - for (Queue::iterator i (q.begin ()), e (q.end ()); i != e; ++i) + while (i != e) { - u64 sn ((*i).ext_id_); - Descr& d = (*i).int_id_; + NAK_ptr nak (new NAK (addr)); - if (d.lost ()) + // Inner loop that fills NAK profile with up to max_elem elements. + // + for (; i != e && nak->count () < max_elem; ++i) { - d.timer (d.timer () - 1); + u64 sn ((*i).ext_id_); + Descr& d = (*i).int_id_; - if (d.timer () == 0) + if (d.lost ()) { - //@@ Need exp fallback. - // - d.nak_count (d.nak_count () + 1); - d.timer ((d.nak_count () + 1) * nak_timeout); + d.timer (d.timer () - 1); - nak->add (sn); + if (d.timer () == 0) + { + //@@ Need exp fallback. + // + d.nak_count (d.nak_count () + 1); + d.timer ((d.nak_count () + 1) * nak_timeout); - //cerr << 6 << "NAK # " << d.nak_count () << ": " - // << addr << " " << sn << endl; + nak->add (sn); + + ++count; + + //cerr << 6 << "NAK # " << d.nak_count () << ": " + // << addr << " " << sn << endl; + } } } - } - // Send NAK. - // - if (nak->count ()) - { - // cerr << 5 << "NAK: " << addr << " " << nak->count () << " sns" - // << endl; + // Send this NAK. + // + if (nak->count ()) + { + // cerr << 5 << "NAK: " << addr << " " << nak->count () << " sns" + // << endl; - Message_ptr m (new Message); + Message_ptr m (new Message); - m->add (Profile_ptr (nak.release ())); + m->add (Profile_ptr (nak.release ())); - msgs.push_back (m); + msgs.push_back (m); + } } + /* + if (count > max_elem) + cerr << "NAC count : " << count << endl + << "NAK max : " << max_elem << endl; + */ + // Detect and record new losses. // for (u64 sn (q.sn () + 1), end (q.max_sn ()); sn < end; ++sn) @@ -256,6 +282,8 @@ namespace ACE_RMCast // First message from this source. // hold_.bind (from, Queue (sn)); + //@@ rm + // hold_.find (from, e); in_->recv (m); @@ -300,13 +328,20 @@ namespace ACE_RMCast void Acknowledge:: send (Message_ptr m) { - if (m->find (Data::id) != 0) + if (Data const* data = static_cast<Data const*> (m->find (Data::id))) { - Lock l (mutex_); + u32 max_size (max_payload_size - data->size ()); + u32 max_elem (NRTM::max_count (max_size)); - Profile_ptr nrtm (create_nrtm ()); + if (max_elem > 0) + { + Lock l (mutex_); + + Profile_ptr nrtm (create_nrtm (max_elem)); - if (nrtm.get ()) m->add (nrtm); + if (nrtm.get ()) + m->add (nrtm); + } nrtm_timer_ = nrtm_timeout; // Reset timer. } @@ -315,7 +350,7 @@ namespace ACE_RMCast } Profile_ptr Acknowledge:: - create_nrtm () + create_nrtm (u32 max_elem) { // Prepare NRTM. // @@ -332,11 +367,16 @@ namespace ACE_RMCast //@@ Should look for the highest known number. // nrtm->insert (addr, q.sn ()); + + if (--max_elem == 0) + break; } } - if (nrtm->empty ()) return Profile_ptr (0); - else return Profile_ptr (nrtm.release ()); + if (nrtm->empty ()) + return Profile_ptr (0); + else + return Profile_ptr (nrtm.release ()); } ACE_THR_FUNC_RETURN Acknowledge:: diff --git a/protocols/ace/RMCast/Acknowledge.h b/protocols/ace/RMCast/Acknowledge.h index 5726f07d0d3..ec0ff77d12b 100644 --- a/protocols/ace/RMCast/Acknowledge.h +++ b/protocols/ace/RMCast/Acknowledge.h @@ -5,8 +5,8 @@ #ifndef ACE_RMCAST_ACKNOWLEDGE_H #define ACE_RMCAST_ACKNOWLEDGE_H -#include <ace/Hash_Map_Manager.h> -#include <ace/Thread_Manager.h> +#include "ace/Hash_Map_Manager.h" +#include "ace/Thread_Manager.h" #include "Stack.h" #include "Protocol.h" @@ -217,7 +217,7 @@ namespace ACE_RMCast track_queue (Address const& addr, Queue& q, Messages& msgs); Profile_ptr - create_nrtm (); + create_nrtm (u32 max_elem); static ACE_THR_FUNC_RETURN track_thunk (void* obj); @@ -232,9 +232,6 @@ namespace ACE_RMCast bool stop_; ACE_Thread_Manager tracker_mgr_; }; - - - } #endif // ACE_RMCAST_ACKNOWLEDGE_H diff --git a/protocols/ace/RMCast/Fragment.cpp b/protocols/ace/RMCast/Fragment.cpp new file mode 100644 index 00000000000..6a0c203a133 --- /dev/null +++ b/protocols/ace/RMCast/Fragment.cpp @@ -0,0 +1,59 @@ +// file : ace/RMCast/Fragment.cpp +// author : Boris Kolpackov <boris@kolpackov.net> +// cvs-id : $Id$ + +#include "Fragment.h" + +/* +#include <iostream> +using std::cerr; +using std::endl; +*/ + +namespace ACE_RMCast +{ + Fragment:: + Fragment () + { + } + + void Fragment:: + send (Message_ptr m) + { + if (Data const* data = static_cast<Data const*> (m->find (Data::id))) + { + if (data->size () <= max_payload_size) + { + out_->send (m); + return; + } + + char const* p (data->buf ()); + size_t size (data->size ()); + + // Need fragmentation. + // + u32 packets (size / max_payload_size + (size % max_payload_size ? 1 : 0)); + + // cerr << "size : " << size << endl + // << "packs: " << packets << endl; + + + for (u32 i (1); i <= packets; ++i) + { + Message_ptr part (new Message); + + size_t s (i == packets ? size % max_payload_size : max_payload_size); + + // cerr << "pack: " << s << endl; + + part->add (Profile_ptr (new Part (i, packets, size))); + part->add (Profile_ptr (new Data (p, s))); + + out_->send (part); + + p += s; + } + } + } +} diff --git a/protocols/ace/RMCast/Fragment.h b/protocols/ace/RMCast/Fragment.h new file mode 100644 index 00000000000..836307e71a8 --- /dev/null +++ b/protocols/ace/RMCast/Fragment.h @@ -0,0 +1,25 @@ +// file : ace/RMCast/Fragment.h +// author : Boris Kolpackov <boris@kolpackov.net> +// cvs-id : $Id$ + +#ifndef ACE_RMCAST_FRAGMENT_H +#define ACE_RMCAST_FRAGMENT_H + +#include "Stack.h" +#include "Protocol.h" +#include "Bits.h" + +namespace ACE_RMCast +{ + class Fragment : public Element + { + public: + Fragment (); + + public: + virtual void + send (Message_ptr m); + }; +} + +#endif // ACE_RMCAST_FRAGMENT_H diff --git a/protocols/ace/RMCast/Link.cpp b/protocols/ace/RMCast/Link.cpp index 7e8cab77845..6bfcf7085dd 100644 --- a/protocols/ace/RMCast/Link.cpp +++ b/protocols/ace/RMCast/Link.cpp @@ -2,8 +2,8 @@ // author : Boris Kolpackov <boris@kolpackov.net> // cvs-id : $Id$ -#include <ace/Time_Value.h> // ACE_Time_Value -#include <ace/OS_NS_sys_socket.h> +#include "ace/Time_Value.h" // ACE_Time_Value +#include "ace/OS_NS_sys_socket.h" #include "Link.h" @@ -118,12 +118,19 @@ namespace ACE_RMCast } else { - hold_ = m; + if ((rand () % 5) != 0) + { + send_ (m); + } + else + { + hold_ = m; - // Make a copy in M so that the reliable loop below - // won't add FROM and TO to HOLD_. - // - m = Message_ptr (new Message (*hold_)); + // Make a copy in M so that the reliable loop below + // won't add FROM and TO to HOLD_. + // + m = hold_->clone (); + } } } } @@ -145,6 +152,14 @@ namespace ACE_RMCast os << *m; + if (os.length () > max_packet_size) + { + ACE_ERROR ((LM_ERROR, + "packet length (%d) exceeds max_poacket_size (%d)\n", + os.length (), max_packet_size)); + abort (); + } + ssock_.send (os.buffer (), os.length (), addr_); /* @@ -284,6 +299,10 @@ namespace ACE_RMCast { m->add (Profile_ptr (new NoData (hdr, is))); } + else if (id == Part::id) + { + m->add (Profile_ptr (new Part (hdr, is))); + } else { //cerr << 0 << "unknown profile id " << hdr.id () << endl; diff --git a/protocols/ace/RMCast/Protocol.cpp b/protocols/ace/RMCast/Protocol.cpp index afa60396e31..755872371cb 100644 --- a/protocols/ace/RMCast/Protocol.cpp +++ b/protocols/ace/RMCast/Protocol.cpp @@ -13,4 +13,5 @@ namespace ACE_RMCast u16 const NAK:: id = 0x0005; u16 const NRTM:: id = 0x0006; u16 const NoData::id = 0x0007; + u16 const Part:: id = 0x0008; } diff --git a/protocols/ace/RMCast/Protocol.h b/protocols/ace/RMCast/Protocol.h index 82df32e467f..7df54fd5f88 100644 --- a/protocols/ace/RMCast/Protocol.h +++ b/protocols/ace/RMCast/Protocol.h @@ -29,6 +29,17 @@ namespace ACE_RMCast typedef ACE_CDR::ULong u32; typedef ACE_CDR::ULongLong u64; + // Protocol parameters + // + // + u32 const max_packet_size = 1460; // MTU (1500) - IP-header - UDP-header + u32 const max_service_size = 40; // service profiles (Part, SN, etc) sizes + // plus message size. + u32 const max_payload_size = max_packet_size - max_service_size; + + // + // + // typedef ACE_INET_Addr Address; struct AddressHasher @@ -107,6 +118,12 @@ namespace ACE_RMCast { } + Profile_ptr + clone () + { + return clone_ (); + } + protected: Profile (u16 id) : header_ (id, 0) @@ -117,6 +134,14 @@ namespace ACE_RMCast : header_ (h) { } + + virtual Profile_ptr + clone_ () = 0; + + private: + Profile& + operator= (Profile const&); + public: u16 id () const @@ -206,10 +231,10 @@ namespace ACE_RMCast return ss; } + // // // - class Message; typedef @@ -224,17 +249,29 @@ namespace ACE_RMCast { } + Message_ptr + clone () + { + return new Message (*this); + } + + protected: Message (Message const& m) : profiles_ (4) { for (Profiles::const_iterator i (m.profiles_); !i.done (); i.advance ()) { - // Shallow copy of profiles. + // Shallow copy of profiles. This implies that profiles are not + // modified as they go up/down the stack. // profiles_.bind ((*i).ext_id_, (*i).int_id_); } } + private: + Message& + operator= (Message const&); + public: bool add (Profile_ptr p) @@ -251,6 +288,18 @@ namespace ACE_RMCast return true; } + void + replace (Profile_ptr p) + { + profiles_.rebind (p->id (), p); + } + + void + remove (u16 id) + { + profiles_.unbind (id); + } + Profile const* find (u16 id) const { @@ -336,6 +385,25 @@ namespace ACE_RMCast size (calculate_size ()); } + From_ptr + clone () + { + return From_ptr (static_cast<From*> (clone_ ().release ())); + } + + protected: + virtual Profile_ptr + clone_ () + { + return new From (*this); + } + + From (From const& from) + : Profile (from), + address_ (from.address_) + { + } + public: Address const& address () const @@ -401,6 +469,25 @@ namespace ACE_RMCast size (calculate_size ()); } + To_ptr + clone () + { + return To_ptr (static_cast<To*> (clone_ ().release ())); + } + + protected: + virtual Profile_ptr + clone_ () + { + return new To (*this); + } + + To (To const& to) + : Profile (to), + address_ (to.address_) + { + } + public: Address const& address () const @@ -449,27 +536,61 @@ namespace ACE_RMCast public: Data (Header const& h, istream& is) - : Profile (h), buf_ (0), size_ (h.size ()) + : Profile (h), + buf_ (0), + size_ (h.size ()), + capacity_ (size_) { if (size_) { - buf_ = reinterpret_cast<char*> (operator new (size_)); + buf_ = reinterpret_cast<char*> (operator new (capacity_)); is.read_char_array (buf_, size_); } } - Data (void const* buf, size_t s) - : Profile (id), buf_ (0), size_ (s) + Data (void const* buf, size_t s, size_t capacity = 0) + : Profile (id), + buf_ (0), + size_ (s), + capacity_ (capacity < size_ ? size_ : capacity) { if (size_) { - buf_ = reinterpret_cast<char*> (operator new (size_)); + buf_ = reinterpret_cast<char*> (operator new (capacity_)); ACE_OS::memcpy (buf_, buf, size_); } Profile::size (calculate_size ()); } + Data_ptr + clone () + { + return Data_ptr (static_cast<Data*> (clone_ ().release ())); + } + + protected: + virtual Profile_ptr + clone_ () + { + return new Data (*this); + } + + Data (Data const& d) + : Profile (d), + buf_ (0), + size_ (d.size_), + capacity_ (d.capacity_) + { + if (size_) + { + buf_ = reinterpret_cast<char*> (operator new (capacity_)); + ACE_OS::memcpy (buf_, d.buf_, size_); + } + + Profile::size (calculate_size ()); + } + public: char const* buf () const @@ -477,12 +598,35 @@ namespace ACE_RMCast return buf_; } + char* + buf () + { + return buf_; + } + size_t size () const { return size_; } + void + size (size_t s) + { + if (s > capacity_) + abort (); + + size_ = s; + + Profile::size (calculate_size ()); + } + + size_t + capacity () const + { + return capacity_; + } + public: virtual void serialize_body (ostream& os) const @@ -499,6 +643,7 @@ namespace ACE_RMCast private: char* buf_; size_t size_; + size_t capacity_; }; @@ -528,6 +673,25 @@ namespace ACE_RMCast size (calculate_size ()); } + SN_ptr + clone () + { + return SN_ptr (static_cast<SN*> (clone_ ().release ())); + } + + protected: + virtual Profile_ptr + clone_ () + { + return new SN (*this); + } + + SN (SN const& sn) + : Profile (sn), + n_ (sn.n_) + { + } + public: u64 num () const @@ -590,6 +754,10 @@ namespace ACE_RMCast size_t addr_size (ss.total_length ()); + + is >> addr; + is >> port; + // num_of_sns = (size - addr_size) / sn_size // for (unsigned long i (0); i < ((h.size () - addr_size) / sn_size); ++i) @@ -598,9 +766,6 @@ namespace ACE_RMCast sns_.push_back (sn); } - is >> addr; - is >> port; - address_ = Address (port, addr); } @@ -611,6 +776,26 @@ namespace ACE_RMCast size (calculate_size ()); } + NAK_ptr + clone () + { + return NAK_ptr (static_cast<NAK*> (clone_ ().release ())); + } + + protected: + virtual Profile_ptr + clone_ () + { + return new NAK (*this); + } + + NAK (NAK const& nak) + : Profile (nak), + address_ (nak.address_), + sns_ (nak.sns_) + { + } + public: void add (u64 sn) @@ -648,11 +833,50 @@ namespace ACE_RMCast } public: + // Count max number of elements that will fit into NAK profile + // with size <= max_size. + // + static u32 + max_count (u32 max_size) + { + u32 n (0); + + sstream ss; + + Profile::Header hdr (0, 0); + ss << hdr; + + u32 addr (0); + u16 port (0); + ss << addr << port; + + while (true) + { + u64 sn (0); + ss << sn; + + if (ss.total_length () <= max_size) + ++n; + + if (ss.total_length () >= max_size) + break; + } + + return n; + } + + public: virtual void serialize_body (ostream& os) const { NAK& this_ = const_cast<NAK&> (*this); // Don't put in ROM. + u32 addr (address_.get_ip_address ()); + u16 port (address_.get_port_number ()); + + os << addr; + os << port; + // Stone age iteration. // for (iterator i (this_.begin ()); !i.done (); i.advance ()) @@ -661,13 +885,6 @@ namespace ACE_RMCast i.next (psn); os << *psn; } - - - u32 addr (address_.get_ip_address ()); - u16 port (address_.get_port_number ()); - - os << addr; - os << port; } virtual void @@ -675,6 +892,12 @@ namespace ACE_RMCast { NAK& this_ = const_cast<NAK&> (*this); // Don't put in ROM. + u32 addr (0); + u16 port (0); + + ss << addr; + ss << port; + // Stone age iteration. // for (iterator i (this_.begin ()); !i.done (); i.advance ()) @@ -682,13 +905,6 @@ namespace ACE_RMCast u64 sn (0); ss << sn; } - - - u32 addr (0); - u16 port (0); - - ss << addr; - ss << port; } private: @@ -734,7 +950,6 @@ namespace ACE_RMCast is >> addr; is >> port; - map_.bind (Address (port, addr), sn); } } @@ -745,6 +960,28 @@ namespace ACE_RMCast size (calculate_size ()); } + NRTM_ptr + clone () + { + return NRTM_ptr (static_cast<NRTM*> (clone_ ().release ())); + } + + protected: + virtual Profile_ptr + clone_ () + { + return new NRTM (*this); + } + + NRTM (NRTM const& nrtm) + : Profile (nrtm) + { + for (Map::const_iterator i (nrtm.map_); !i.done (); i.advance ()) + { + map_.bind ((*i).ext_id_, (*i).int_id_); + } + } + public: void insert (Address const& addr, u64 sn) @@ -771,6 +1008,40 @@ namespace ACE_RMCast } public: + // Count max number of elements that will fit into NRTM profile + // with size <= max_size. + // + static u32 + max_count (u32 max_size) + { + u32 n (0); + + sstream ss; + + Profile::Header hdr (0, 0); + ss << hdr; + + while (true) + { + u32 addr (0); + u16 port (0); + u64 sn (0); + + ss << sn; + ss << addr; + ss << port; + + if (ss.total_length () <= max_size) + ++n; + + if (ss.total_length () >= max_size) + break; + } + + return n; + } + + public: virtual void serialize_body (ostream& os) const { @@ -783,6 +1054,7 @@ namespace ACE_RMCast os << sn; os << addr; os << port; + } } @@ -839,6 +1111,24 @@ namespace ACE_RMCast Profile::size (0); } + NoData_ptr + clone () + { + return NoData_ptr (static_cast<NoData*> (clone_ ().release ())); + } + + protected: + virtual Profile_ptr + clone_ () + { + return new NoData (*this); + } + + NoData (NoData const& no_data) + : Profile (no_data) + { + } + public: virtual void serialize_body (ostream&) const @@ -851,6 +1141,101 @@ namespace ACE_RMCast } }; + // + // + // + struct Part; + + typedef + ACE_Refcounted_Auto_Ptr<Part, ACE_Null_Mutex> + Part_ptr; + + struct Part : Profile + { + static u16 const id; + + public: + Part (Header const& h, istream& is) + : Profile (h) + { + is >> num_; + is >> of_; + is >> total_size_; + } + + Part (u32 num, u32 of, u64 total_size) + : Profile (id), + num_ (num), + of_ (of), + total_size_ (total_size) + { + size (calculate_size ()); + } + + Part_ptr + clone () + { + return Part_ptr (static_cast<Part*> (clone_ ().release ())); + } + + protected: + virtual Profile_ptr + clone_ () + { + return new Part (*this); + } + + Part (Part const& part) + : Profile (part), + num_ (part.num_), + of_ (part.of_), + total_size_ (part.total_size_) + { + } + + public: + u32 + num () const + { + return num_; + } + + u32 + of () const + { + return of_; + } + + u64 + total_size () const + { + return total_size_; + } + + public: + virtual void + serialize_body (ostream& os) const + { + os << num_; + os << of_; + os << total_size_; + } + + virtual void + serialize_body (sstream& ss) const + { + ss << num_; + ss << of_; + ss << total_size_; + } + + + private: + u32 num_; + u32 of_; + u64 total_size_; + }; + } /* diff --git a/protocols/ace/RMCast/Reassemble.cpp b/protocols/ace/RMCast/Reassemble.cpp new file mode 100644 index 00000000000..a089fb45c03 --- /dev/null +++ b/protocols/ace/RMCast/Reassemble.cpp @@ -0,0 +1,119 @@ +// file : ace/RMCast/Reassemble.cpp +// author : Boris Kolpackov <boris@kolpackov.net> +// cvs-id : $Id$ + +#include "Reassemble.h" + +/* +#include <iostream> +using std::cerr; +using std::endl; +*/ + +namespace ACE_RMCast +{ + Reassemble:: + Reassemble () + { + } + + void Reassemble:: + recv (Message_ptr m) + { + Map::ENTRY* e; + Address from ( + static_cast<From const*> (m->find (From::id))->address ()); + + if (Data const* data = static_cast<Data const*> (m->find (Data::id))) + { + if (Part const* part = static_cast<Part const*> (m->find (Part::id))) + { + if (map_.find (from, e) == -1) + { + // First part of the message. + // + + if (part->num () != 1) + { + // We assume that we received NoData for one of the preceding + // fragments. Ignore this one. + return; + } + + Data_ptr new_data (new Data (data->buf (), + data->size (), + part->total_size ())); + + //std::cerr << "part->total_size (): " << part->total_size () << endl; + + map_.bind (from, new_data); + } + else + { + // Next part of the message. + // + + if (part->num () == 1) + abort (); + + Data const* data (static_cast<Data const*> (m->find (Data::id))); + + Data_ptr& new_data (e->int_id_); + + ACE_OS::memcpy (new_data->buf () + new_data->size (), + data->buf (), + data->size ()); + + //std::cerr << "data->size (): " << data->size () << endl + // << "new_data->size (): " << new_data->size () << endl + // << "new_data->capa (): " << new_data->capacity () << endl; + + new_data->size (new_data->size () + data->size ()); + + + if (part->num () == part->of ()) + { + // Reassembly is complete. + // + if (part->total_size () != new_data->size ()) + abort (); + + Message_ptr new_msg (new Message ()); + + Address to ( + static_cast<To const*> (m->find (To::id))->address ()); + + new_msg->add (Profile_ptr (new To (to))); + new_msg->add (Profile_ptr (new From (from))); + new_msg->add (Profile_ptr (new_data.release ())); + + map_.unbind (from); + + in_->recv (new_msg); + } + } + } + else + { + // Non-fragmented message. Make sure we are in the consistent state + // and forward it up. + // + if (map_.find (from, e) != -1) + abort (); + + in_->recv (m); + } + } + else if (m->find (NoData::id) != 0) + { + if (map_.find (from, e) != -1) + { + // We already received some fragments. Clean everyhting up. + // + map_.unbind (from); + } + + in_->recv (m); + } + } +} diff --git a/protocols/ace/RMCast/Reassemble.h b/protocols/ace/RMCast/Reassemble.h new file mode 100644 index 00000000000..0f074c9855c --- /dev/null +++ b/protocols/ace/RMCast/Reassemble.h @@ -0,0 +1,39 @@ +// file : ace/RMCast/Reassemble.h +// author : Boris Kolpackov <boris@kolpackov.net> +// cvs-id : $Id$ + +#ifndef ACE_RMCAST_REASSEMBLE_H +#define ACE_RMCAST_REASSEMBLE_H + +#include "ace/Hash_Map_Manager.h" + +#include "Stack.h" +#include "Protocol.h" +#include "Bits.h" + +namespace ACE_RMCast +{ + class Reassemble : public Element + { + public: + Reassemble (); + + public: + virtual void + recv (Message_ptr m); + + private: + typedef + ACE_Hash_Map_Manager_Ex<Address, + Data_ptr, + AddressHasher, + ACE_Equal_To<Address>, + ACE_Null_Mutex> + Map; + + Map map_; + }; +} + + +#endif // ACE_RMCAST_REASSEMBLE_H diff --git a/protocols/ace/RMCast/Retransmit.cpp b/protocols/ace/RMCast/Retransmit.cpp index 609ec526294..7850df9c6f3 100644 --- a/protocols/ace/RMCast/Retransmit.cpp +++ b/protocols/ace/RMCast/Retransmit.cpp @@ -14,7 +14,9 @@ namespace ACE_RMCast Retransmit:: Retransmit () - : cond_ (mutex_), stop_ (false) + : cond_ (mutex_), + sn_ (1), + stop_ (false) { } @@ -43,13 +45,12 @@ namespace ACE_RMCast void Retransmit:: send (Message_ptr m) { - if (Data const* data = static_cast<Data const*> (m->find (Data::id))) + if (m->find (Data::id) != 0) { - u64 sn (static_cast<SN const*> (m->find (SN::id))->num ()); + m->add (Profile_ptr (new SN (sn_))); Lock l (mutex_); - - queue_.bind (sn, Descr (Data_ptr (new Data (*data)))); + queue_.bind (sn_++, Descr (m->clone ())); } out_->send (m); @@ -73,8 +74,7 @@ namespace ACE_RMCast u64* psn; j.next (psn); - Message_ptr m (new Message); - m->add (Profile_ptr (new SN (*psn))); + Message_ptr m; Queue::ENTRY* pair; @@ -82,7 +82,7 @@ namespace ACE_RMCast { //cerr << 5 << "PRTM " << to << " " << pair->ext_id_ << endl; - m->add (pair->int_id_.data ()); + m = pair->int_id_.message (); pair->int_id_.reset (); } @@ -90,6 +90,8 @@ namespace ACE_RMCast { //cerr << 4 << "message " << *psn << " not available" << endl; + m = Message_ptr (new Message); + m->add (Profile_ptr (new SN (*psn))); m->add (Profile_ptr (new NoData)); } diff --git a/protocols/ace/RMCast/Retransmit.h b/protocols/ace/RMCast/Retransmit.h index fa910ee2837..c76299a8d5b 100644 --- a/protocols/ace/RMCast/Retransmit.h +++ b/protocols/ace/RMCast/Retransmit.h @@ -5,8 +5,8 @@ #ifndef ACE_RMCAST_RETRANSMIT_H #define ACE_RMCAST_RETRANSMIT_H -#include <ace/Hash_Map_Manager.h> -#include <ace/Thread_Manager.h> +#include "ace/Hash_Map_Manager.h" +#include "ace/Thread_Manager.h" #include "Stack.h" #include "Protocol.h" @@ -38,12 +38,12 @@ namespace ACE_RMCast // Shouldn't be available but ACE_Hash_Map needs it. // Descr () - : data_ (), count_ (0) + : msg_ (), count_ (0) { } - Descr (Data_ptr d) - : data_ (d), count_ (0) + Descr (Message_ptr msg) + : msg_ (msg), count_ (0) { } @@ -59,17 +59,14 @@ namespace ACE_RMCast count_ = 0; } - // It would be logical to return data_ptr but ACE ref_auto_ptr - // hasn't learned how to convert between pointers yet. - // - Profile_ptr - data () const + Message_ptr + message () const { - return Profile_ptr (new Data (*data_)); + return msg_->clone (); } private: - Data_ptr data_; + Message_ptr msg_; unsigned long count_; }; @@ -89,6 +86,8 @@ namespace ACE_RMCast Mutex mutex_; Condition cond_; + u64 sn_; + bool stop_; ACE_Thread_Manager tracker_mgr_; }; diff --git a/protocols/ace/RMCast/Simulator.h b/protocols/ace/RMCast/Simulator.h index f8ffe13473f..27b227f488b 100644 --- a/protocols/ace/RMCast/Simulator.h +++ b/protocols/ace/RMCast/Simulator.h @@ -1,5 +1,3 @@ -// -*- C++ -*- - // file : ace/RMCast/Simulator.h // author : Boris Kolpackov <boris@kolpackov.net> // cvs-id : $Id$ @@ -7,8 +5,6 @@ #ifndef ACE_RMCAST_SIMULATOR_H #define ACE_RMCAST_SIMULATOR_H -#include "ace/Thread_Mutex.h" - #include "Stack.h" #include "Protocol.h" #include "Bits.h" diff --git a/protocols/ace/RMCast/Socket.cpp b/protocols/ace/RMCast/Socket.cpp index 31f2807ac53..519879ee489 100644 --- a/protocols/ace/RMCast/Socket.cpp +++ b/protocols/ace/RMCast/Socket.cpp @@ -15,11 +15,11 @@ #include "Protocol.h" #include "Bits.h" -#include "Link.h" -#include "Simulator.h" -#include "Retransmit.h" +#include "Fragment.h" +#include "Reassemble.h" #include "Acknowledge.h" - +#include "Retransmit.h" +#include "Link.h" #include "Socket.h" @@ -52,8 +52,6 @@ namespace ACE_RMCast private: bool loop_; - u64 sn_; //@@ lock? - Mutex mutex_; Condition cond_; @@ -61,9 +59,10 @@ namespace ACE_RMCast ACE_Pipe signal_pipe_; + ACE_Auto_Ptr<Fragment> fragment_; + ACE_Auto_Ptr<Reassemble> reassemble_; ACE_Auto_Ptr<Acknowledge> acknowledge_; ACE_Auto_Ptr<Retransmit> retransmit_; - ACE_Auto_Ptr<Simulator> simulator_; ACE_Auto_Ptr<Link> link_; }; @@ -71,31 +70,33 @@ namespace ACE_RMCast Socket_Impl:: Socket_Impl (Address const& a, bool loop, bool simulator) : loop_ (loop), - sn_ (1), cond_ (mutex_) { signal_pipe_.open (); + fragment_.reset (new Fragment ()); + reassemble_.reset (new Reassemble ()); acknowledge_.reset (new Acknowledge ()); retransmit_.reset (new Retransmit ()); - simulator_.reset (new Simulator ()); link_.reset (new Link (a, simulator)); // Start IN stack from top to bottom. // in_start (0); - acknowledge_->in_start (this); + fragment_->in_start (this); + reassemble_->in_start (fragment_.get ()); + acknowledge_->in_start (reassemble_.get ()); retransmit_->in_start (acknowledge_.get ()); - simulator_->in_start (retransmit_.get ()); - link_->in_start (simulator_.get ()); + link_->in_start (retransmit_.get ()); // Start OUT stack from bottom up. // link_->out_start (0); - simulator_->out_start (link_.get ()); - retransmit_->out_start (simulator_.get ()); + retransmit_->out_start (link_.get ()); acknowledge_->out_start (retransmit_.get ()); - out_start (acknowledge_.get ()); + reassemble_->out_start (acknowledge_.get ()); + fragment_->out_start (reassemble_.get ()); + out_start (fragment_.get ()); } Socket_Impl:: @@ -104,17 +105,19 @@ namespace ACE_RMCast // Stop OUT stack from top to bottom. // out_stop (); + fragment_->out_stop (); + reassemble_->out_stop (); acknowledge_->out_stop (); retransmit_->out_stop (); - simulator_->out_stop (); link_->out_stop (); // Stop IN stack from bottom up. // link_->in_stop (); - simulator_->in_stop (); retransmit_->in_stop (); acknowledge_->in_stop (); + reassemble_->in_stop (); + fragment_->in_stop (); in_stop (); } @@ -124,7 +127,6 @@ namespace ACE_RMCast { Message_ptr m (new Message); - m->add (Profile_ptr (new SN (sn_++))); m->add (Profile_ptr (new Data (buf, s))); // Qualification is for VC6 and VxWorks. diff --git a/protocols/ace/RMCast/Socket.h b/protocols/ace/RMCast/Socket.h index 9f516d488ce..98e3b8b0ba8 100644 --- a/protocols/ace/RMCast/Socket.h +++ b/protocols/ace/RMCast/Socket.h @@ -1,5 +1,3 @@ -// -*- C++ -*- - // file : ace/RMCast/Socket.h // author : Boris Kolpackov <boris@kolpackov.net> // cvs-id : $Id$ diff --git a/protocols/examples/RMCast/Send_Msg/Protocol.h b/protocols/examples/RMCast/Send_Msg/Protocol.h index c3edf43b1fb..9c7be6eb0c8 100644 --- a/protocols/examples/RMCast/Send_Msg/Protocol.h +++ b/protocols/examples/RMCast/Send_Msg/Protocol.h @@ -5,7 +5,7 @@ #ifndef PROTOCOL_H #define PROTOCOL_H -unsigned short const payload_size = 256; +unsigned short const payload_size = 512; unsigned long const message_count = 10000; struct Message diff --git a/protocols/tests/RMCast/Protocol.h b/protocols/tests/RMCast/Protocol.h index 18adc37fc3a..c0ecca3dae9 100644 --- a/protocols/tests/RMCast/Protocol.h +++ b/protocols/tests/RMCast/Protocol.h @@ -5,7 +5,7 @@ #ifndef PROTOCOL_H #define PROTOCOL_H -unsigned short const payload_size = 256; +unsigned short const payload_size = 1234; unsigned long const message_count = 1000; struct Message diff --git a/protocols/tests/RMCast/Sender.cpp b/protocols/tests/RMCast/Sender.cpp index 70b97049443..85340b6b4a0 100644 --- a/protocols/tests/RMCast/Sender.cpp +++ b/protocols/tests/RMCast/Sender.cpp @@ -39,7 +39,7 @@ ACE_TMAIN (int argc, ACE_TCHAR* argv[]) // Keep running in case retransmissions are needed. // - ACE_OS::sleep (ACE_Time_Value (50, 0)); + ACE_OS::sleep (ACE_Time_Value (60, 0)); return 0; } |