From 3c1a0828b68565e11715142336d5ac9cf1341afd Mon Sep 17 00:00:00 2001 From: boris Date: Mon, 11 Jul 2005 18:50:23 +0000 Subject: ChangeLogTag:Mon Jul 11 20:08:51 2005 Boris Kolpackov --- protocols/ace/RMCast/Acknowledge.cpp | 42 ++++---- protocols/ace/RMCast/Acknowledge.h | 5 +- protocols/ace/RMCast/Flow.cpp | 133 ++++++++++++++++++++++++ protocols/ace/RMCast/Flow.h | 43 ++++++++ protocols/ace/RMCast/Fragment.cpp | 23 +++- protocols/ace/RMCast/Fragment.h | 9 +- protocols/ace/RMCast/Link.cpp | 28 +++-- protocols/ace/RMCast/Link.h | 6 +- protocols/ace/RMCast/Protocol.h | 35 ++++--- protocols/ace/RMCast/Reassemble.cpp | 4 +- protocols/ace/RMCast/Reassemble.h | 5 +- protocols/ace/RMCast/Retransmit.cpp | 29 +++--- protocols/ace/RMCast/Retransmit.h | 7 +- protocols/ace/RMCast/Socket.cpp | 41 +++++--- protocols/ace/RMCast/Socket.h | 5 +- protocols/examples/RMCast/Send_Msg/Protocol.h | 6 +- protocols/examples/RMCast/Send_Msg/Receiver.cpp | 29 ++++-- 17 files changed, 350 insertions(+), 100 deletions(-) create mode 100644 protocols/ace/RMCast/Flow.cpp create mode 100644 protocols/ace/RMCast/Flow.h (limited to 'protocols') diff --git a/protocols/ace/RMCast/Acknowledge.cpp b/protocols/ace/RMCast/Acknowledge.cpp index 20965f7bbf7..3d2c4ec5230 100644 --- a/protocols/ace/RMCast/Acknowledge.cpp +++ b/protocols/ace/RMCast/Acknowledge.cpp @@ -16,14 +16,11 @@ using std::endl; namespace ACE_RMCast { - ACE_Time_Value const tick (0, 5000); - unsigned long const nak_timeout = 20; // # of ticks. - unsigned long const nrtm_timeout = 50; // # of ticks. - Acknowledge:: - Acknowledge () - : cond_ (mutex_), - nrtm_timer_ (nrtm_timeout), + Acknowledge (Parameters const& params) + : params_ (params), + cond_ (mutex_), + nrtm_timer_ (params_.nrtm_timeout ()), stop_ (false) { } @@ -110,10 +107,13 @@ namespace ACE_RMCast if (--nrtm_timer_ == 0) { - nrtm_timer_ = nrtm_timeout; + nrtm_timer_ = params_.nrtm_timeout (); // Send NRTM. // + unsigned short max_payload_size ( + params_.max_packet_size () - max_service_size); + u32 max_elem (NRTM::max_count (max_payload_size)); Profile_ptr nrtm (create_nrtm (max_elem)); @@ -141,7 +141,7 @@ namespace ACE_RMCast // { ACE_Time_Value time (ACE_OS::gettimeofday ()); - time += tick; + time += params_.tick (); Lock l (mutex_); @@ -165,6 +165,9 @@ namespace ACE_RMCast void Acknowledge:: track_queue (Address const& addr, Queue& q, Messages& msgs) { + unsigned short max_payload_size ( + params_.max_packet_size () - max_service_size); + u32 max_elem (NAK::max_count (max_payload_size)); u32 count (0); @@ -192,14 +195,14 @@ namespace ACE_RMCast //@@ Need exp fallback. // d.nak_count (d.nak_count () + 1); - d.timer ((d.nak_count () + 1) * nak_timeout); + d.timer ((d.nak_count () + 1) * params_.nak_timeout ()); nak->add (sn); ++count; - //cerr << 6 << "NAK # " << d.nak_count () << ": " - // << addr << " " << sn << endl; + // cerr << 6 << "NAK # " << d.nak_count () << ": " + // << addr << " " << sn << endl; } } } @@ -219,12 +222,6 @@ namespace ACE_RMCast } } - /* - if (count > max_elem) - cerr << "NAC count : " << count << endl - << "NAK max : " << max_elem << endl; - */ - // Detect and record new losses. // for (u64 sn (q.sn () + 1), end (q.max_sn ()); sn < end; ++sn) @@ -282,10 +279,6 @@ namespace ACE_RMCast // First message from this source. // hold_.bind (from, Queue (sn)); - //@@ rm - // - hold_.find (from, e); - in_->recv (m); } else @@ -330,6 +323,9 @@ namespace ACE_RMCast { if (Data const* data = static_cast (m->find (Data::id))) { + unsigned short max_payload_size ( + params_.max_packet_size () - max_service_size); + if (max_payload_size > data->size ()) { u32 max_size (max_payload_size - data->size ()); @@ -346,7 +342,7 @@ namespace ACE_RMCast } } - nrtm_timer_ = nrtm_timeout; // Reset timer. + nrtm_timer_ = params_.nrtm_timeout (); // Reset timer. } out_->send (m); diff --git a/protocols/ace/RMCast/Acknowledge.h b/protocols/ace/RMCast/Acknowledge.h index ec0ff77d12b..ebabf6ec4a4 100644 --- a/protocols/ace/RMCast/Acknowledge.h +++ b/protocols/ace/RMCast/Acknowledge.h @@ -11,13 +11,14 @@ #include "Stack.h" #include "Protocol.h" #include "Bits.h" +#include "Parameters.h" namespace ACE_RMCast { class Acknowledge : public Element { public: - Acknowledge (); + Acknowledge (Parameters const& params); virtual void in_start (In_Element* in); @@ -223,6 +224,8 @@ namespace ACE_RMCast track_thunk (void* obj); private: + Parameters const& params_; + Map hold_; Mutex mutex_; Condition cond_; diff --git a/protocols/ace/RMCast/Flow.cpp b/protocols/ace/RMCast/Flow.cpp new file mode 100644 index 00000000000..d61837030de --- /dev/null +++ b/protocols/ace/RMCast/Flow.cpp @@ -0,0 +1,133 @@ +// file : ace/RMCast/Flow.cpp +// author : Boris Kolpackov +// cvs-id : $Id$ + +#include "Flow.h" + +/* +#include +using std::cerr; +using std::endl; +*/ + +namespace ACE_RMCast +{ + Flow:: + Flow (Parameters const& params) + : params_ (params), + nak_time_ (0, 0), + sample_start_time_ (0, 0), + sample_bytes_ (0), + current_tput_ (0.0), + cap_tput_ (0.0) + { + } + + void Flow:: + send (Message_ptr m) + { + if (Data const* data = static_cast (m->find (Data::id))) + { + ACE_Time_Value now_time (ACE_OS::gettimeofday ()); + + Lock l (mutex_); + sample_bytes_ += data->size (); + + if (sample_start_time_ == ACE_Time_Value (0, 0)) + { + sample_start_time_ = now_time; + } + else + { + ACE_Time_Value delta (now_time - sample_start_time_); + + if (delta > ACE_Time_Value (0, 2000)) + { + current_tput_ = + double (sample_bytes_) / (delta.sec () * 1000000 + delta.usec ()); + + // cerr << "tput: " << current_tput_ << " bytes/usec" << endl; + + sample_bytes_ = 0; + sample_start_time_ = ACE_Time_Value (0, 0); + } + } + + if (cap_tput_ != 0.0 + && current_tput_ != 0.0 + && current_tput_ > cap_tput_) + { + double dev = (current_tput_ - cap_tput_) / current_tput_; + + // cerr << "deviation: " << dev << endl; + + // Cap decay algorithm. + // + { + ACE_Time_Value delta (now_time - nak_time_); + + unsigned long msec = delta.msec (); + + double x = msec / -16000.0; + double y = 1.0 * exp (x); + cap_tput_ = cap_tput_ / y; + + // cerr << "cap decay: " << cap_tput_ << " bytes/usec" << endl; + } + + l.release (); + + + timespec time; + time.tv_sec = 0; + time.tv_nsec = static_cast (dev * 500000.0); + + // Don't bother to sleep if the time is less than 10 usec. + // + if (time.tv_nsec > 10000) + ACE_OS::sleep (ACE_Time_Value (time)); + } + } + + out_->send (m); + } + + void Flow:: + recv (Message_ptr m) + { + if (NAK const* nak = static_cast (m->find (NAK::id))) + { + Address to (static_cast (m->find (To::id))->address ()); + + if (nak->address () == to) + { + // This one is for us. + // + + //cerr << "NAK from " + // << static_cast (m->find (From::id))->address () + // << " for " << nak->count () << " sns." << endl; + + + ACE_Time_Value nak_time (ACE_OS::gettimeofday ()); + + Lock l (mutex_); + + nak_time_ = nak_time; + + if (cap_tput_ == 0.0) + cap_tput_ = current_tput_; + + if (cap_tput_ != 0.0) + { + cap_tput_ = cap_tput_ - cap_tput_ / 6.0; + + // cerr << "cap: " << cap_tput_ << " bytes/usec" << endl; + } + } + } + + in_->recv (m); + } +} + diff --git a/protocols/ace/RMCast/Flow.h b/protocols/ace/RMCast/Flow.h new file mode 100644 index 00000000000..e64d0d438a2 --- /dev/null +++ b/protocols/ace/RMCast/Flow.h @@ -0,0 +1,43 @@ +// file : ace/RMCast/Flow.h +// author : Boris Kolpackov +// cvs-id : $Id$ + +#ifndef ACE_RMCAST_FLOW_H +#define ACE_RMCAST_FLOW_H + +#include "Stack.h" +#include "Protocol.h" +#include "Bits.h" +#include "Parameters.h" + +namespace ACE_RMCast +{ + class Flow : public Element + { + public: + Flow (Parameters const& params); + + public: + virtual void + send (Message_ptr m); + + virtual void + recv (Message_ptr m); + + private: + Parameters const& params_; + + Mutex mutex_; + ACE_Time_Value nak_time_; + + // Throughput sampling. + // + ACE_Time_Value sample_start_time_; + unsigned long sample_bytes_; + double current_tput_; + double cap_tput_; + }; +} + + +#endif // ACE_RMCAST_FLOW_H diff --git a/protocols/ace/RMCast/Fragment.cpp b/protocols/ace/RMCast/Fragment.cpp index f8a2fa97515..7b9cfa49cf6 100644 --- a/protocols/ace/RMCast/Fragment.cpp +++ b/protocols/ace/RMCast/Fragment.cpp @@ -13,7 +13,9 @@ using std::endl; namespace ACE_RMCast { Fragment:: - Fragment () + Fragment (Parameters const& params) + : params_ (params), + sn_ (1) { } @@ -22,8 +24,19 @@ namespace ACE_RMCast { if (Data const* data = static_cast (m->find (Data::id))) { + unsigned short max_payload_size ( + params_.max_packet_size () - max_service_size); + if (data->size () <= max_payload_size) { + u64 sn; + { + Lock l (mutex_); + sn = sn_++; + } + + m->add (Profile_ptr (new SN (sn))); + out_->send (m); return; } @@ -38,7 +51,6 @@ namespace ACE_RMCast // cerr << "size : " << size << endl // << "packs: " << packets << endl; - for (u32 i (1); i <= packets; ++i) { Message_ptr part (new Message); @@ -47,6 +59,13 @@ namespace ACE_RMCast // cerr << "pack: " << s << endl; + u64 sn; + { + Lock l (mutex_); + sn = sn_++; + } + + part->add (Profile_ptr (new SN (sn))); part->add (Profile_ptr (new Part (i, packets, size))); part->add (Profile_ptr (new Data (p, s))); diff --git a/protocols/ace/RMCast/Fragment.h b/protocols/ace/RMCast/Fragment.h index 836307e71a8..bfaa3044c83 100644 --- a/protocols/ace/RMCast/Fragment.h +++ b/protocols/ace/RMCast/Fragment.h @@ -8,17 +8,24 @@ #include "Stack.h" #include "Protocol.h" #include "Bits.h" +#include "Parameters.h" namespace ACE_RMCast { class Fragment : public Element { public: - Fragment (); + Fragment (Parameters const& params); public: virtual void send (Message_ptr m); + + Parameters const& params_; + + private: + Mutex mutex_; + u64 sn_; }; } diff --git a/protocols/ace/RMCast/Link.cpp b/protocols/ace/RMCast/Link.cpp index 9e7203d9c61..f36815a96b3 100644 --- a/protocols/ace/RMCast/Link.cpp +++ b/protocols/ace/RMCast/Link.cpp @@ -9,27 +9,23 @@ namespace ACE_RMCast { - // Time period after which a manual cancellation request is - // checked for. - // - ACE_Time_Value const timeout (0, 500); - Link:: - Link (Address const& addr, bool simulator) - : addr_ (addr), + Link (Address const& addr, Parameters const& params) + : params_ (params), + addr_ (addr), ssock_ (Address (static_cast (0), static_cast (INADDR_ANY)), AF_INET, IPPROTO_UDP, 1), - stop_ (false), - simulator_ (simulator) + stop_ (false) { srand (time (0)); rsock_.set_option (IP_MULTICAST_LOOP, 0); + // rsock_.set_option (IP_MULTICAST_TTL, 0); // Set recv/send buffers. // @@ -104,9 +100,9 @@ namespace ACE_RMCast { // Simulate message loss and reordering. // - if (simulator_) + if (params_.simulator ()) { - if ((rand () % 5) != 0) + if ((rand () % 17) != 0) { Lock l (mutex_); @@ -118,7 +114,7 @@ namespace ACE_RMCast } else { - if ((rand () % 5) != 0) + if ((rand () % 17) != 0) { send_ (m); } @@ -152,11 +148,11 @@ namespace ACE_RMCast os << *m; - if (os.length () > max_packet_size) + if (os.length () > params_.max_packet_size ()) { ACE_ERROR ((LM_ERROR, "packet length (%d) exceeds max_poacket_size (%d)\n", - os.length (), max_packet_size)); + os.length (), params_.max_packet_size ())); for (Message::ProfileIterator i (m->begin ()); !i.done (); i.advance ()) { @@ -197,11 +193,11 @@ namespace ACE_RMCast Address addr; - // Block for up to timeout time waiting for an incomming message. + // Block for up to one tick waiting for an incomming message. // for (;;) { - ACE_Time_Value t (timeout); + ACE_Time_Value t (params_.tick ()); ssize_t r = rsock_.recv (data, 4, addr, MSG_PEEK, &t); diff --git a/protocols/ace/RMCast/Link.h b/protocols/ace/RMCast/Link.h index 337791dda79..cff166c6e13 100644 --- a/protocols/ace/RMCast/Link.h +++ b/protocols/ace/RMCast/Link.h @@ -12,13 +12,14 @@ #include "Stack.h" #include "Protocol.h" +#include "Parameters.h" namespace ACE_RMCast { class Link : public Element { public: - Link (Address const& addr, bool simulator); + Link (Address const& addr, Parameters const& params); virtual void in_start (In_Element* in); @@ -49,6 +50,8 @@ namespace ACE_RMCast recv (Message_ptr); private: + Parameters const& params_; + Address addr_, self_; ACE_SOCK_Dgram_Mcast rsock_; ACE_SOCK_Dgram ssock_; @@ -58,7 +61,6 @@ namespace ACE_RMCast // Simulator. // - bool simulator_; Message_ptr hold_; Mutex mutex_; diff --git a/protocols/ace/RMCast/Protocol.h b/protocols/ace/RMCast/Protocol.h index 6d8adb6799f..3eabb5fa47a 100644 --- a/protocols/ace/RMCast/Protocol.h +++ b/protocols/ace/RMCast/Protocol.h @@ -20,6 +20,7 @@ #include "Bits.h" +#include namespace ACE_RMCast { @@ -32,10 +33,8 @@ namespace ACE_RMCast // Protocol parameters // // - u32 const max_packet_size = 1470; // MTU (1500) - IP-header - UDP-header - u32 const max_service_size = 60; // service profiles (Part, SN, etc), sizes - // plus message size. - u32 const max_payload_size = max_packet_size - max_service_size; + unsigned short const max_service_size = 60; // service profiles (Part, SN, + // etc), sizes plus message size. // // @@ -66,7 +65,7 @@ namespace ACE_RMCast struct Profile; typedef - ACE_Refcounted_Auto_Ptr + ACE_Refcounted_Auto_Ptr Profile_ptr; struct Profile @@ -370,7 +369,7 @@ namespace ACE_RMCast struct From; typedef - ACE_Refcounted_Auto_Ptr + ACE_Refcounted_Auto_Ptr From_ptr; struct From : Profile @@ -454,7 +453,7 @@ namespace ACE_RMCast struct To; typedef - ACE_Refcounted_Auto_Ptr + ACE_Refcounted_Auto_Ptr To_ptr; struct To : Profile @@ -538,7 +537,7 @@ namespace ACE_RMCast struct Data; typedef - ACE_Refcounted_Auto_Ptr + ACE_Refcounted_Auto_Ptr Data_ptr; struct Data : Profile @@ -546,6 +545,13 @@ namespace ACE_RMCast static u16 const id; public: + virtual + ~Data () + { + if (buf_) + operator delete (buf_); + } + Data (Header const& h, istream& is) : Profile (h), buf_ (0), @@ -664,7 +670,7 @@ namespace ACE_RMCast struct SN; typedef - ACE_Refcounted_Auto_Ptr + ACE_Refcounted_Auto_Ptr SN_ptr; struct SN : Profile @@ -734,7 +740,7 @@ namespace ACE_RMCast class NAK; typedef - ACE_Refcounted_Auto_Ptr + ACE_Refcounted_Auto_Ptr NAK_ptr; class NAK : public Profile @@ -930,7 +936,7 @@ namespace ACE_RMCast struct NRTM; typedef - ACE_Refcounted_Auto_Ptr + ACE_Refcounted_Auto_Ptr NRTM_ptr; struct NRTM : Profile @@ -1104,7 +1110,7 @@ namespace ACE_RMCast struct NoData; typedef - ACE_Refcounted_Auto_Ptr + ACE_Refcounted_Auto_Ptr NoData_ptr; struct NoData : Profile @@ -1153,13 +1159,14 @@ namespace ACE_RMCast } }; + // // // struct Part; typedef - ACE_Refcounted_Auto_Ptr + ACE_Refcounted_Auto_Ptr Part_ptr; struct Part : Profile @@ -1253,7 +1260,7 @@ namespace ACE_RMCast /* inline std::ostream& -operator<< (std::ostream& os, RMCast::Address const& a) +operator<< (std::ostream& os, ACE_RMCast::Address const& a) { char buf[64]; a.addr_to_string (buf, 64, 1); diff --git a/protocols/ace/RMCast/Reassemble.cpp b/protocols/ace/RMCast/Reassemble.cpp index e99b5e9d98d..e4033b1249f 100644 --- a/protocols/ace/RMCast/Reassemble.cpp +++ b/protocols/ace/RMCast/Reassemble.cpp @@ -13,7 +13,8 @@ using std::endl; namespace ACE_RMCast { Reassemble:: - Reassemble () + Reassemble (Parameters const& params) + : params_ (params) { } @@ -56,6 +57,7 @@ namespace ACE_RMCast if (part->num () == 1) abort (); + Data const* data = static_cast (m->find (Data::id)); Data_ptr& new_data = e->int_id_; diff --git a/protocols/ace/RMCast/Reassemble.h b/protocols/ace/RMCast/Reassemble.h index 0f074c9855c..cffa4fdc359 100644 --- a/protocols/ace/RMCast/Reassemble.h +++ b/protocols/ace/RMCast/Reassemble.h @@ -10,19 +10,22 @@ #include "Stack.h" #include "Protocol.h" #include "Bits.h" +#include "Parameters.h" namespace ACE_RMCast { class Reassemble : public Element { public: - Reassemble (); + Reassemble (Parameters const& params); public: virtual void recv (Message_ptr m); private: + Parameters const& params_; + typedef ACE_Hash_Map_Manager_Ex +using std::cerr; +using std::endl; +*/ + namespace ACE_RMCast { - ACE_Time_Value const tick (0, 50000); - unsigned long const retention_time = 60; // How many ticks to retain. - Retransmit:: - Retransmit () - : cond_ (mutex_), - sn_ (1), + Retransmit (Parameters const& params) + : params_ (params), + cond_ (mutex_), stop_ (false) { } @@ -47,10 +50,10 @@ namespace ACE_RMCast { if (m->find (Data::id) != 0) { - m->add (Profile_ptr (new SN (sn_))); + SN const* sn = static_cast (m->find (SN::id)); Lock l (mutex_); - queue_.bind (sn_++, Descr (m->clone ())); + queue_.bind (sn->num (), Descr (m->clone ())); } out_->send (m); @@ -99,10 +102,8 @@ namespace ACE_RMCast } } } - else - { - in_->recv (m); - } + + in_->recv (m); } ACE_THR_FUNC_RETURN Retransmit:: @@ -121,7 +122,7 @@ namespace ACE_RMCast for (Queue::iterator i (queue_); !i.done ();) { - if ((*i).int_id_.inc () >= retention_time) + if ((*i).int_id_.inc () >= params_.retention_timeout ()) { u64 sn ((*i).ext_id_); i.advance (); @@ -136,7 +137,7 @@ namespace ACE_RMCast // Go to sleep but watch for "manual cancellation" request. // ACE_Time_Value time (ACE_OS::gettimeofday ()); - time += tick; + time += params_.tick (); while (!stop_) { diff --git a/protocols/ace/RMCast/Retransmit.h b/protocols/ace/RMCast/Retransmit.h index c76299a8d5b..0a3a8f72bd7 100644 --- a/protocols/ace/RMCast/Retransmit.h +++ b/protocols/ace/RMCast/Retransmit.h @@ -11,13 +11,14 @@ #include "Stack.h" #include "Protocol.h" #include "Bits.h" +#include "Parameters.h" namespace ACE_RMCast { class Retransmit : public Element { public: - Retransmit (); + Retransmit (Parameters const& params); virtual void out_start (Out_Element* out); @@ -82,12 +83,12 @@ namespace ACE_RMCast track_thunk (void* obj); private: + Parameters const& params_; + Queue queue_; Mutex mutex_; Condition cond_; - u64 sn_; - bool stop_; ACE_Thread_Manager tracker_mgr_; }; diff --git a/protocols/ace/RMCast/Socket.cpp b/protocols/ace/RMCast/Socket.cpp index 519879ee489..4ed634bd59d 100644 --- a/protocols/ace/RMCast/Socket.cpp +++ b/protocols/ace/RMCast/Socket.cpp @@ -19,10 +19,17 @@ #include "Reassemble.h" #include "Acknowledge.h" #include "Retransmit.h" +#include "Flow.h" #include "Link.h" #include "Socket.h" +/* +#include +using std::cerr; +using std::endl; +*/ + namespace ACE_RMCast { class Socket_Impl : protected Element @@ -30,7 +37,7 @@ namespace ACE_RMCast public: ~Socket_Impl (); - Socket_Impl (Address const& a, bool loop, bool simulator); + Socket_Impl (Address const& a, bool loop, Parameters const& params); public: void @@ -51,6 +58,7 @@ namespace ACE_RMCast private: bool loop_; + Parameters const params_; Mutex mutex_; Condition cond_; @@ -63,22 +71,25 @@ namespace ACE_RMCast ACE_Auto_Ptr reassemble_; ACE_Auto_Ptr acknowledge_; ACE_Auto_Ptr retransmit_; + ACE_Auto_Ptr flow_; ACE_Auto_Ptr link_; }; Socket_Impl:: - Socket_Impl (Address const& a, bool loop, bool simulator) + Socket_Impl (Address const& a, bool loop, Parameters const& params) : loop_ (loop), + params_ (params), cond_ (mutex_) { signal_pipe_.open (); - fragment_.reset (new Fragment ()); - reassemble_.reset (new Reassemble ()); - acknowledge_.reset (new Acknowledge ()); - retransmit_.reset (new Retransmit ()); - link_.reset (new Link (a, simulator)); + fragment_.reset (new Fragment (params_)); + reassemble_.reset (new Reassemble (params_)); + acknowledge_.reset (new Acknowledge (params_)); + retransmit_.reset (new Retransmit (params_)); + flow_.reset (new Flow (params_)); + link_.reset (new Link (a, params_)); // Start IN stack from top to bottom. // @@ -87,12 +98,14 @@ namespace ACE_RMCast reassemble_->in_start (fragment_.get ()); acknowledge_->in_start (reassemble_.get ()); retransmit_->in_start (acknowledge_.get ()); - link_->in_start (retransmit_.get ()); + flow_->in_start (retransmit_.get ()); + link_->in_start (flow_.get ()); // Start OUT stack from bottom up. // link_->out_start (0); - retransmit_->out_start (link_.get ()); + flow_->out_start (link_.get ()); + retransmit_->out_start (flow_.get ()); acknowledge_->out_start (retransmit_.get ()); reassemble_->out_start (acknowledge_.get ()); fragment_->out_start (reassemble_.get ()); @@ -109,11 +122,13 @@ namespace ACE_RMCast reassemble_->out_stop (); acknowledge_->out_stop (); retransmit_->out_stop (); + flow_->out_stop (); link_->out_stop (); // Stop IN stack from bottom up. // link_->in_stop (); + flow_->in_stop (); retransmit_->in_stop (); acknowledge_->in_stop (); reassemble_->in_stop (); @@ -269,6 +284,9 @@ namespace ACE_RMCast Lock l (mutex_); + //if (queue_.size () != 0) + // cerr << "recv socket queue size: " << queue_.size () << endl; + bool signal (queue_.is_empty ()); queue_.enqueue_tail (m); @@ -287,7 +305,6 @@ namespace ACE_RMCast cond_.signal (); } - } } @@ -301,8 +318,8 @@ namespace ACE_RMCast } Socket:: - Socket (Address const& a, bool loop, bool simulator) - : impl_ (new Socket_Impl (a, loop, simulator)) + Socket (Address const& a, bool loop, Parameters const& params) + : impl_ (new Socket_Impl (a, loop, params)) { } diff --git a/protocols/ace/RMCast/Socket.h b/protocols/ace/RMCast/Socket.h index 98e3b8b0ba8..c1a0a26a565 100644 --- a/protocols/ace/RMCast/Socket.h +++ b/protocols/ace/RMCast/Socket.h @@ -12,6 +12,7 @@ #include "ace/Time_Value.h" #include "RMCast_Export.h" +#include "Parameters.h" namespace ACE_RMCast @@ -27,7 +28,9 @@ namespace ACE_RMCast // If 'simulator' is 'true' then internal message loss and // reordering simulator (on IPv4 level) is turned on. // - Socket (ACE_INET_Addr const& a, bool loop = true, bool simulator = false); + Socket (ACE_INET_Addr const& a, + bool loop = true, + Parameters const& params = Parameters ()); public: virtual void diff --git a/protocols/examples/RMCast/Send_Msg/Protocol.h b/protocols/examples/RMCast/Send_Msg/Protocol.h index 9c7be6eb0c8..88fdb4d6a51 100644 --- a/protocols/examples/RMCast/Send_Msg/Protocol.h +++ b/protocols/examples/RMCast/Send_Msg/Protocol.h @@ -5,12 +5,12 @@ #ifndef PROTOCOL_H #define PROTOCOL_H -unsigned short const payload_size = 512; -unsigned long const message_count = 10000; +unsigned short const payload_size = 702; +unsigned long const message_count = 80000; struct Message { - unsigned long sn; + unsigned int sn; unsigned short payload[payload_size]; }; diff --git a/protocols/examples/RMCast/Send_Msg/Receiver.cpp b/protocols/examples/RMCast/Send_Msg/Receiver.cpp index 39808151fe9..f86e95192f6 100644 --- a/protocols/examples/RMCast/Send_Msg/Receiver.cpp +++ b/protocols/examples/RMCast/Send_Msg/Receiver.cpp @@ -5,6 +5,8 @@ #include "ace/Vector_T.h" #include "ace/Log_Msg.h" #include "ace/OS_NS_string.h" +#include "ace/Time_Value.h" // ACE_Time_Value +#include "ace/OS_NS_sys_time.h" // gettimeofday #include "ace/RMCast/Socket.h" @@ -47,7 +49,7 @@ ACE_TMAIN (int argc, ACE_TCHAR* argv[]) // VC6 does not know about new rules. // { - for (unsigned long i = 0; i < message_count; ++i) + for (unsigned int i = 0; i < message_count; ++i) { received.push_back (0); damaged.push_back (0); @@ -57,11 +59,19 @@ ACE_TMAIN (int argc, ACE_TCHAR* argv[]) Message msg; + bool first (true); + ACE_Time_Value start_time, time; while (true) { ssize_t s = socket.size (); + if (first) + { + start_time = ACE_OS::gettimeofday (); + first = false; + } + if (s == -1 && errno == ENOENT) { ACE_ERROR ((LM_ERROR, "unavailable message detected\n")); @@ -106,7 +116,9 @@ ACE_TMAIN (int argc, ACE_TCHAR* argv[]) if (msg.sn + 1 == message_count) break; } - unsigned long lost_count (0), damaged_count (0), duplicate_count (0); + time = ACE_OS::gettimeofday () - start_time; + + unsigned int lost_count (0), damaged_count (0), duplicate_count (0); { for (Status_List::Iterator i (received); !i.done (); i.advance ()) @@ -140,13 +152,18 @@ ACE_TMAIN (int argc, ACE_TCHAR* argv[]) } } + unsigned long tput = + (sizeof (msg) * message_count) / (time.msec () == 0 ? 1 : time.msec ()); + ACE_DEBUG ((LM_DEBUG, - "lost : %d\n" - "damaged : %d\n" - "duplicate : %d\n", + "lost : %d\n" + "damaged : %d\n" + "duplicate : %d\n" + "throughput : %d KB/sec\n", lost_count, damaged_count, - duplicate_count)); + duplicate_count, + tput)); /* cout << "lost message dump:" << endl; -- cgit v1.2.1