summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorboris <boris@ae88bc3d-4319-0410-8dbf-d08b4c9d3795>2005-07-11 18:50:23 +0000
committerboris <boris@ae88bc3d-4319-0410-8dbf-d08b4c9d3795>2005-07-11 18:50:23 +0000
commit3c1a0828b68565e11715142336d5ac9cf1341afd (patch)
tree9639bb30ccf320ebbba2dce38af66e850b25805d
parent0c51de4219486246601f6d1be31c730e71a790e1 (diff)
downloadATCD-3c1a0828b68565e11715142336d5ac9cf1341afd.tar.gz
ChangeLogTag:Mon Jul 11 20:08:51 2005 Boris Kolpackov <boris@kolpackov.net>
-rw-r--r--ChangeLog29
-rw-r--r--protocols/ace/RMCast/Acknowledge.cpp42
-rw-r--r--protocols/ace/RMCast/Acknowledge.h5
-rw-r--r--protocols/ace/RMCast/Flow.cpp133
-rw-r--r--protocols/ace/RMCast/Flow.h43
-rw-r--r--protocols/ace/RMCast/Fragment.cpp23
-rw-r--r--protocols/ace/RMCast/Fragment.h9
-rw-r--r--protocols/ace/RMCast/Link.cpp28
-rw-r--r--protocols/ace/RMCast/Link.h6
-rw-r--r--protocols/ace/RMCast/Protocol.h35
-rw-r--r--protocols/ace/RMCast/Reassemble.cpp4
-rw-r--r--protocols/ace/RMCast/Reassemble.h5
-rw-r--r--protocols/ace/RMCast/Retransmit.cpp29
-rw-r--r--protocols/ace/RMCast/Retransmit.h7
-rw-r--r--protocols/ace/RMCast/Socket.cpp41
-rw-r--r--protocols/ace/RMCast/Socket.h5
-rw-r--r--protocols/examples/RMCast/Send_Msg/Protocol.h6
-rw-r--r--protocols/examples/RMCast/Send_Msg/Receiver.cpp29
18 files changed, 379 insertions, 100 deletions
diff --git a/ChangeLog b/ChangeLog
index 9455b486e25..8762386f63a 100644
--- a/ChangeLog
+++ b/ChangeLog
@@ -1,3 +1,32 @@
+Mon Jul 11 20:08:51 2005 Boris Kolpackov <boris@kolpackov.net>
+
+ * protocols/ace/RMCast/Flow.cpp:
+ * protocols/ace/RMCast/Flow.h:
+
+ Implemented flow control stack element.
+
+ * protocols/ace/RMCast/Acknowledge.cpp:
+ * protocols/ace/RMCast/Acknowledge.h:
+ * protocols/ace/RMCast/Fragment.cpp:
+ * protocols/ace/RMCast/Fragment.h:
+ * protocols/ace/RMCast/Link.cpp:
+ * protocols/ace/RMCast/Link.h:
+ * protocols/ace/RMCast/Protocol.h:
+ * protocols/ace/RMCast/Reassemble.cpp:
+ * protocols/ace/RMCast/Reassemble.h:
+ * protocols/ace/RMCast/Retransmit.cpp:
+ * protocols/ace/RMCast/Retransmit.h:
+ * protocols/ace/RMCast/Socket.cpp:
+ * protocols/ace/RMCast/Socket.h:
+
+ Made protocol parameters configurable on a pee-instance
+ basis.
+
+ * protocols/examples/RMCast/Send_Msg/Protocol.h:
+ * protocols/examples/RMCast/Send_Msg/Receiver.cpp:
+
+ Made receiver print throughput information.
+
Mon Jul 11 08:41:12 UTC 2005 Johnny Willemsen <jwillemsen@remedy.nl>
* ace/OS_NS_Thread.h:
diff --git a/protocols/ace/RMCast/Acknowledge.cpp b/protocols/ace/RMCast/Acknowledge.cpp
index 20965f7bbf7..3d2c4ec5230 100644
--- a/protocols/ace/RMCast/Acknowledge.cpp
+++ b/protocols/ace/RMCast/Acknowledge.cpp
@@ -16,14 +16,11 @@ using std::endl;
namespace ACE_RMCast
{
- ACE_Time_Value const tick (0, 5000);
- unsigned long const nak_timeout = 20; // # of ticks.
- unsigned long const nrtm_timeout = 50; // # of ticks.
-
Acknowledge::
- Acknowledge ()
- : cond_ (mutex_),
- nrtm_timer_ (nrtm_timeout),
+ Acknowledge (Parameters const& params)
+ : params_ (params),
+ cond_ (mutex_),
+ nrtm_timer_ (params_.nrtm_timeout ()),
stop_ (false)
{
}
@@ -110,10 +107,13 @@ namespace ACE_RMCast
if (--nrtm_timer_ == 0)
{
- nrtm_timer_ = nrtm_timeout;
+ nrtm_timer_ = params_.nrtm_timeout ();
// Send NRTM.
//
+ unsigned short max_payload_size (
+ params_.max_packet_size () - max_service_size);
+
u32 max_elem (NRTM::max_count (max_payload_size));
Profile_ptr nrtm (create_nrtm (max_elem));
@@ -141,7 +141,7 @@ namespace ACE_RMCast
//
{
ACE_Time_Value time (ACE_OS::gettimeofday ());
- time += tick;
+ time += params_.tick ();
Lock l (mutex_);
@@ -165,6 +165,9 @@ namespace ACE_RMCast
void Acknowledge::
track_queue (Address const& addr, Queue& q, Messages& msgs)
{
+ unsigned short max_payload_size (
+ params_.max_packet_size () - max_service_size);
+
u32 max_elem (NAK::max_count (max_payload_size));
u32 count (0);
@@ -192,14 +195,14 @@ namespace ACE_RMCast
//@@ Need exp fallback.
//
d.nak_count (d.nak_count () + 1);
- d.timer ((d.nak_count () + 1) * nak_timeout);
+ d.timer ((d.nak_count () + 1) * params_.nak_timeout ());
nak->add (sn);
++count;
- //cerr << 6 << "NAK # " << d.nak_count () << ": "
- // << addr << " " << sn << endl;
+ // cerr << 6 << "NAK # " << d.nak_count () << ": "
+ // << addr << " " << sn << endl;
}
}
}
@@ -219,12 +222,6 @@ namespace ACE_RMCast
}
}
- /*
- if (count > max_elem)
- cerr << "NAC count : " << count << endl
- << "NAK max : " << max_elem << endl;
- */
-
// Detect and record new losses.
//
for (u64 sn (q.sn () + 1), end (q.max_sn ()); sn < end; ++sn)
@@ -282,10 +279,6 @@ namespace ACE_RMCast
// First message from this source.
//
hold_.bind (from, Queue (sn));
- //@@ rm
- //
- hold_.find (from, e);
-
in_->recv (m);
}
else
@@ -330,6 +323,9 @@ namespace ACE_RMCast
{
if (Data const* data = static_cast<Data const*> (m->find (Data::id)))
{
+ unsigned short max_payload_size (
+ params_.max_packet_size () - max_service_size);
+
if (max_payload_size > data->size ())
{
u32 max_size (max_payload_size - data->size ());
@@ -346,7 +342,7 @@ namespace ACE_RMCast
}
}
- nrtm_timer_ = nrtm_timeout; // Reset timer.
+ nrtm_timer_ = params_.nrtm_timeout (); // Reset timer.
}
out_->send (m);
diff --git a/protocols/ace/RMCast/Acknowledge.h b/protocols/ace/RMCast/Acknowledge.h
index ec0ff77d12b..ebabf6ec4a4 100644
--- a/protocols/ace/RMCast/Acknowledge.h
+++ b/protocols/ace/RMCast/Acknowledge.h
@@ -11,13 +11,14 @@
#include "Stack.h"
#include "Protocol.h"
#include "Bits.h"
+#include "Parameters.h"
namespace ACE_RMCast
{
class Acknowledge : public Element
{
public:
- Acknowledge ();
+ Acknowledge (Parameters const& params);
virtual void
in_start (In_Element* in);
@@ -223,6 +224,8 @@ namespace ACE_RMCast
track_thunk (void* obj);
private:
+ Parameters const& params_;
+
Map hold_;
Mutex mutex_;
Condition cond_;
diff --git a/protocols/ace/RMCast/Flow.cpp b/protocols/ace/RMCast/Flow.cpp
new file mode 100644
index 00000000000..d61837030de
--- /dev/null
+++ b/protocols/ace/RMCast/Flow.cpp
@@ -0,0 +1,133 @@
+// file : ace/RMCast/Flow.cpp
+// author : Boris Kolpackov <boris@kolpackov.net>
+// cvs-id : $Id$
+
+#include "Flow.h"
+
+/*
+#include <iostream>
+using std::cerr;
+using std::endl;
+*/
+
+namespace ACE_RMCast
+{
+ Flow::
+ Flow (Parameters const& params)
+ : params_ (params),
+ nak_time_ (0, 0),
+ sample_start_time_ (0, 0),
+ sample_bytes_ (0),
+ current_tput_ (0.0),
+ cap_tput_ (0.0)
+ {
+ }
+
+ void Flow::
+ send (Message_ptr m)
+ {
+ if (Data const* data = static_cast<Data const*> (m->find (Data::id)))
+ {
+ ACE_Time_Value now_time (ACE_OS::gettimeofday ());
+
+ Lock l (mutex_);
+ sample_bytes_ += data->size ();
+
+ if (sample_start_time_ == ACE_Time_Value (0, 0))
+ {
+ sample_start_time_ = now_time;
+ }
+ else
+ {
+ ACE_Time_Value delta (now_time - sample_start_time_);
+
+ if (delta > ACE_Time_Value (0, 2000))
+ {
+ current_tput_ =
+ double (sample_bytes_) / (delta.sec () * 1000000 + delta.usec ());
+
+ // cerr << "tput: " << current_tput_ << " bytes/usec" << endl;
+
+ sample_bytes_ = 0;
+ sample_start_time_ = ACE_Time_Value (0, 0);
+ }
+ }
+
+ if (cap_tput_ != 0.0
+ && current_tput_ != 0.0
+ && current_tput_ > cap_tput_)
+ {
+ double dev = (current_tput_ - cap_tput_) / current_tput_;
+
+ // cerr << "deviation: " << dev << endl;
+
+ // Cap decay algorithm.
+ //
+ {
+ ACE_Time_Value delta (now_time - nak_time_);
+
+ unsigned long msec = delta.msec ();
+
+ double x = msec / -16000.0;
+ double y = 1.0 * exp (x);
+ cap_tput_ = cap_tput_ / y;
+
+ // cerr << "cap decay: " << cap_tput_ << " bytes/usec" << endl;
+ }
+
+ l.release ();
+
+
+ timespec time;
+ time.tv_sec = 0;
+ time.tv_nsec = static_cast<unsigned long> (dev * 500000.0);
+
+ // Don't bother to sleep if the time is less than 10 usec.
+ //
+ if (time.tv_nsec > 10000)
+ ACE_OS::sleep (ACE_Time_Value (time));
+ }
+ }
+
+ out_->send (m);
+ }
+
+ void Flow::
+ recv (Message_ptr m)
+ {
+ if (NAK const* nak = static_cast<NAK const*> (m->find (NAK::id)))
+ {
+ Address to (static_cast<To const*> (m->find (To::id))->address ());
+
+ if (nak->address () == to)
+ {
+ // This one is for us.
+ //
+
+ //cerr << "NAK from "
+ // << static_cast<From const*> (m->find (From::id))->address ()
+ // << " for " << nak->count () << " sns." << endl;
+
+
+ ACE_Time_Value nak_time (ACE_OS::gettimeofday ());
+
+ Lock l (mutex_);
+
+ nak_time_ = nak_time;
+
+ if (cap_tput_ == 0.0)
+ cap_tput_ = current_tput_;
+
+ if (cap_tput_ != 0.0)
+ {
+ cap_tput_ = cap_tput_ - cap_tput_ / 6.0;
+
+ // cerr << "cap: " << cap_tput_ << " bytes/usec" << endl;
+ }
+ }
+ }
+
+ in_->recv (m);
+ }
+}
+
diff --git a/protocols/ace/RMCast/Flow.h b/protocols/ace/RMCast/Flow.h
new file mode 100644
index 00000000000..e64d0d438a2
--- /dev/null
+++ b/protocols/ace/RMCast/Flow.h
@@ -0,0 +1,43 @@
+// file : ace/RMCast/Flow.h
+// author : Boris Kolpackov <boris@kolpackov.net>
+// cvs-id : $Id$
+
+#ifndef ACE_RMCAST_FLOW_H
+#define ACE_RMCAST_FLOW_H
+
+#include "Stack.h"
+#include "Protocol.h"
+#include "Bits.h"
+#include "Parameters.h"
+
+namespace ACE_RMCast
+{
+ class Flow : public Element
+ {
+ public:
+ Flow (Parameters const& params);
+
+ public:
+ virtual void
+ send (Message_ptr m);
+
+ virtual void
+ recv (Message_ptr m);
+
+ private:
+ Parameters const& params_;
+
+ Mutex mutex_;
+ ACE_Time_Value nak_time_;
+
+ // Throughput sampling.
+ //
+ ACE_Time_Value sample_start_time_;
+ unsigned long sample_bytes_;
+ double current_tput_;
+ double cap_tput_;
+ };
+}
+
+
+#endif // ACE_RMCAST_FLOW_H
diff --git a/protocols/ace/RMCast/Fragment.cpp b/protocols/ace/RMCast/Fragment.cpp
index f8a2fa97515..7b9cfa49cf6 100644
--- a/protocols/ace/RMCast/Fragment.cpp
+++ b/protocols/ace/RMCast/Fragment.cpp
@@ -13,7 +13,9 @@ using std::endl;
namespace ACE_RMCast
{
Fragment::
- Fragment ()
+ Fragment (Parameters const& params)
+ : params_ (params),
+ sn_ (1)
{
}
@@ -22,8 +24,19 @@ namespace ACE_RMCast
{
if (Data const* data = static_cast<Data const*> (m->find (Data::id)))
{
+ unsigned short max_payload_size (
+ params_.max_packet_size () - max_service_size);
+
if (data->size () <= max_payload_size)
{
+ u64 sn;
+ {
+ Lock l (mutex_);
+ sn = sn_++;
+ }
+
+ m->add (Profile_ptr (new SN (sn)));
+
out_->send (m);
return;
}
@@ -38,7 +51,6 @@ namespace ACE_RMCast
// cerr << "size : " << size << endl
// << "packs: " << packets << endl;
-
for (u32 i (1); i <= packets; ++i)
{
Message_ptr part (new Message);
@@ -47,6 +59,13 @@ namespace ACE_RMCast
// cerr << "pack: " << s << endl;
+ u64 sn;
+ {
+ Lock l (mutex_);
+ sn = sn_++;
+ }
+
+ part->add (Profile_ptr (new SN (sn)));
part->add (Profile_ptr (new Part (i, packets, size)));
part->add (Profile_ptr (new Data (p, s)));
diff --git a/protocols/ace/RMCast/Fragment.h b/protocols/ace/RMCast/Fragment.h
index 836307e71a8..bfaa3044c83 100644
--- a/protocols/ace/RMCast/Fragment.h
+++ b/protocols/ace/RMCast/Fragment.h
@@ -8,17 +8,24 @@
#include "Stack.h"
#include "Protocol.h"
#include "Bits.h"
+#include "Parameters.h"
namespace ACE_RMCast
{
class Fragment : public Element
{
public:
- Fragment ();
+ Fragment (Parameters const& params);
public:
virtual void
send (Message_ptr m);
+
+ Parameters const& params_;
+
+ private:
+ Mutex mutex_;
+ u64 sn_;
};
}
diff --git a/protocols/ace/RMCast/Link.cpp b/protocols/ace/RMCast/Link.cpp
index 9e7203d9c61..f36815a96b3 100644
--- a/protocols/ace/RMCast/Link.cpp
+++ b/protocols/ace/RMCast/Link.cpp
@@ -9,27 +9,23 @@
namespace ACE_RMCast
{
- // Time period after which a manual cancellation request is
- // checked for.
- //
- ACE_Time_Value const timeout (0, 500);
-
Link::
- Link (Address const& addr, bool simulator)
- : addr_ (addr),
+ Link (Address const& addr, Parameters const& params)
+ : params_ (params),
+ addr_ (addr),
ssock_ (Address (static_cast<unsigned short> (0),
static_cast<ACE_UINT32> (INADDR_ANY)),
AF_INET,
IPPROTO_UDP,
1),
- stop_ (false),
- simulator_ (simulator)
+ stop_ (false)
{
srand (time (0));
rsock_.set_option (IP_MULTICAST_LOOP, 0);
+ // rsock_.set_option (IP_MULTICAST_TTL, 0);
// Set recv/send buffers.
//
@@ -104,9 +100,9 @@ namespace ACE_RMCast
{
// Simulate message loss and reordering.
//
- if (simulator_)
+ if (params_.simulator ())
{
- if ((rand () % 5) != 0)
+ if ((rand () % 17) != 0)
{
Lock l (mutex_);
@@ -118,7 +114,7 @@ namespace ACE_RMCast
}
else
{
- if ((rand () % 5) != 0)
+ if ((rand () % 17) != 0)
{
send_ (m);
}
@@ -152,11 +148,11 @@ namespace ACE_RMCast
os << *m;
- if (os.length () > max_packet_size)
+ if (os.length () > params_.max_packet_size ())
{
ACE_ERROR ((LM_ERROR,
"packet length (%d) exceeds max_poacket_size (%d)\n",
- os.length (), max_packet_size));
+ os.length (), params_.max_packet_size ()));
for (Message::ProfileIterator i (m->begin ()); !i.done (); i.advance ())
{
@@ -197,11 +193,11 @@ namespace ACE_RMCast
Address addr;
- // Block for up to timeout time waiting for an incomming message.
+ // Block for up to one tick waiting for an incomming message.
//
for (;;)
{
- ACE_Time_Value t (timeout);
+ ACE_Time_Value t (params_.tick ());
ssize_t r = rsock_.recv (data, 4, addr, MSG_PEEK, &t);
diff --git a/protocols/ace/RMCast/Link.h b/protocols/ace/RMCast/Link.h
index 337791dda79..cff166c6e13 100644
--- a/protocols/ace/RMCast/Link.h
+++ b/protocols/ace/RMCast/Link.h
@@ -12,13 +12,14 @@
#include "Stack.h"
#include "Protocol.h"
+#include "Parameters.h"
namespace ACE_RMCast
{
class Link : public Element
{
public:
- Link (Address const& addr, bool simulator);
+ Link (Address const& addr, Parameters const& params);
virtual void
in_start (In_Element* in);
@@ -49,6 +50,8 @@ namespace ACE_RMCast
recv (Message_ptr);
private:
+ Parameters const& params_;
+
Address addr_, self_;
ACE_SOCK_Dgram_Mcast rsock_;
ACE_SOCK_Dgram ssock_;
@@ -58,7 +61,6 @@ namespace ACE_RMCast
// Simulator.
//
- bool simulator_;
Message_ptr hold_;
Mutex mutex_;
diff --git a/protocols/ace/RMCast/Protocol.h b/protocols/ace/RMCast/Protocol.h
index 6d8adb6799f..3eabb5fa47a 100644
--- a/protocols/ace/RMCast/Protocol.h
+++ b/protocols/ace/RMCast/Protocol.h
@@ -20,6 +20,7 @@
#include "Bits.h"
+#include <iostream>
namespace ACE_RMCast
{
@@ -32,10 +33,8 @@ namespace ACE_RMCast
// Protocol parameters
//
//
- u32 const max_packet_size = 1470; // MTU (1500) - IP-header - UDP-header
- u32 const max_service_size = 60; // service profiles (Part, SN, etc), sizes
- // plus message size.
- u32 const max_payload_size = max_packet_size - max_service_size;
+ unsigned short const max_service_size = 60; // service profiles (Part, SN,
+ // etc), sizes plus message size.
//
//
@@ -66,7 +65,7 @@ namespace ACE_RMCast
struct Profile;
typedef
- ACE_Refcounted_Auto_Ptr<Profile, ACE_Null_Mutex>
+ ACE_Refcounted_Auto_Ptr<Profile, Mutex>
Profile_ptr;
struct Profile
@@ -370,7 +369,7 @@ namespace ACE_RMCast
struct From;
typedef
- ACE_Refcounted_Auto_Ptr<From, ACE_Null_Mutex>
+ ACE_Refcounted_Auto_Ptr<From, Mutex>
From_ptr;
struct From : Profile
@@ -454,7 +453,7 @@ namespace ACE_RMCast
struct To;
typedef
- ACE_Refcounted_Auto_Ptr<To, ACE_Null_Mutex>
+ ACE_Refcounted_Auto_Ptr<To, Mutex>
To_ptr;
struct To : Profile
@@ -538,7 +537,7 @@ namespace ACE_RMCast
struct Data;
typedef
- ACE_Refcounted_Auto_Ptr<Data, ACE_Null_Mutex>
+ ACE_Refcounted_Auto_Ptr<Data, Mutex>
Data_ptr;
struct Data : Profile
@@ -546,6 +545,13 @@ namespace ACE_RMCast
static u16 const id;
public:
+ virtual
+ ~Data ()
+ {
+ if (buf_)
+ operator delete (buf_);
+ }
+
Data (Header const& h, istream& is)
: Profile (h),
buf_ (0),
@@ -664,7 +670,7 @@ namespace ACE_RMCast
struct SN;
typedef
- ACE_Refcounted_Auto_Ptr<SN, ACE_Null_Mutex>
+ ACE_Refcounted_Auto_Ptr<SN, Mutex>
SN_ptr;
struct SN : Profile
@@ -734,7 +740,7 @@ namespace ACE_RMCast
class NAK;
typedef
- ACE_Refcounted_Auto_Ptr<NAK, ACE_Null_Mutex>
+ ACE_Refcounted_Auto_Ptr<NAK, Mutex>
NAK_ptr;
class NAK : public Profile
@@ -930,7 +936,7 @@ namespace ACE_RMCast
struct NRTM;
typedef
- ACE_Refcounted_Auto_Ptr<NRTM, ACE_Null_Mutex>
+ ACE_Refcounted_Auto_Ptr<NRTM, Mutex>
NRTM_ptr;
struct NRTM : Profile
@@ -1104,7 +1110,7 @@ namespace ACE_RMCast
struct NoData;
typedef
- ACE_Refcounted_Auto_Ptr<NoData, ACE_Null_Mutex>
+ ACE_Refcounted_Auto_Ptr<NoData, Mutex>
NoData_ptr;
struct NoData : Profile
@@ -1153,13 +1159,14 @@ namespace ACE_RMCast
}
};
+
//
//
//
struct Part;
typedef
- ACE_Refcounted_Auto_Ptr<Part, ACE_Null_Mutex>
+ ACE_Refcounted_Auto_Ptr<Part, Mutex>
Part_ptr;
struct Part : Profile
@@ -1253,7 +1260,7 @@ namespace ACE_RMCast
/*
inline
std::ostream&
-operator<< (std::ostream& os, RMCast::Address const& a)
+operator<< (std::ostream& os, ACE_RMCast::Address const& a)
{
char buf[64];
a.addr_to_string (buf, 64, 1);
diff --git a/protocols/ace/RMCast/Reassemble.cpp b/protocols/ace/RMCast/Reassemble.cpp
index e99b5e9d98d..e4033b1249f 100644
--- a/protocols/ace/RMCast/Reassemble.cpp
+++ b/protocols/ace/RMCast/Reassemble.cpp
@@ -13,7 +13,8 @@ using std::endl;
namespace ACE_RMCast
{
Reassemble::
- Reassemble ()
+ Reassemble (Parameters const& params)
+ : params_ (params)
{
}
@@ -56,6 +57,7 @@ namespace ACE_RMCast
if (part->num () == 1)
abort ();
+
Data const* data = static_cast<Data const*> (m->find (Data::id));
Data_ptr& new_data = e->int_id_;
diff --git a/protocols/ace/RMCast/Reassemble.h b/protocols/ace/RMCast/Reassemble.h
index 0f074c9855c..cffa4fdc359 100644
--- a/protocols/ace/RMCast/Reassemble.h
+++ b/protocols/ace/RMCast/Reassemble.h
@@ -10,19 +10,22 @@
#include "Stack.h"
#include "Protocol.h"
#include "Bits.h"
+#include "Parameters.h"
namespace ACE_RMCast
{
class Reassemble : public Element
{
public:
- Reassemble ();
+ Reassemble (Parameters const& params);
public:
virtual void
recv (Message_ptr m);
private:
+ Parameters const& params_;
+
typedef
ACE_Hash_Map_Manager_Ex<Address,
Data_ptr,
diff --git a/protocols/ace/RMCast/Retransmit.cpp b/protocols/ace/RMCast/Retransmit.cpp
index 7850df9c6f3..a2f8dd96adc 100644
--- a/protocols/ace/RMCast/Retransmit.cpp
+++ b/protocols/ace/RMCast/Retransmit.cpp
@@ -7,15 +7,18 @@
#include "Retransmit.h"
+/*
+#include <iostream>
+using std::cerr;
+using std::endl;
+*/
+
namespace ACE_RMCast
{
- ACE_Time_Value const tick (0, 50000);
- unsigned long const retention_time = 60; // How many ticks to retain.
-
Retransmit::
- Retransmit ()
- : cond_ (mutex_),
- sn_ (1),
+ Retransmit (Parameters const& params)
+ : params_ (params),
+ cond_ (mutex_),
stop_ (false)
{
}
@@ -47,10 +50,10 @@ namespace ACE_RMCast
{
if (m->find (Data::id) != 0)
{
- m->add (Profile_ptr (new SN (sn_)));
+ SN const* sn = static_cast<SN const*> (m->find (SN::id));
Lock l (mutex_);
- queue_.bind (sn_++, Descr (m->clone ()));
+ queue_.bind (sn->num (), Descr (m->clone ()));
}
out_->send (m);
@@ -99,10 +102,8 @@ namespace ACE_RMCast
}
}
}
- else
- {
- in_->recv (m);
- }
+
+ in_->recv (m);
}
ACE_THR_FUNC_RETURN Retransmit::
@@ -121,7 +122,7 @@ namespace ACE_RMCast
for (Queue::iterator i (queue_); !i.done ();)
{
- if ((*i).int_id_.inc () >= retention_time)
+ if ((*i).int_id_.inc () >= params_.retention_timeout ())
{
u64 sn ((*i).ext_id_);
i.advance ();
@@ -136,7 +137,7 @@ namespace ACE_RMCast
// Go to sleep but watch for "manual cancellation" request.
//
ACE_Time_Value time (ACE_OS::gettimeofday ());
- time += tick;
+ time += params_.tick ();
while (!stop_)
{
diff --git a/protocols/ace/RMCast/Retransmit.h b/protocols/ace/RMCast/Retransmit.h
index c76299a8d5b..0a3a8f72bd7 100644
--- a/protocols/ace/RMCast/Retransmit.h
+++ b/protocols/ace/RMCast/Retransmit.h
@@ -11,13 +11,14 @@
#include "Stack.h"
#include "Protocol.h"
#include "Bits.h"
+#include "Parameters.h"
namespace ACE_RMCast
{
class Retransmit : public Element
{
public:
- Retransmit ();
+ Retransmit (Parameters const& params);
virtual void
out_start (Out_Element* out);
@@ -82,12 +83,12 @@ namespace ACE_RMCast
track_thunk (void* obj);
private:
+ Parameters const& params_;
+
Queue queue_;
Mutex mutex_;
Condition cond_;
- u64 sn_;
-
bool stop_;
ACE_Thread_Manager tracker_mgr_;
};
diff --git a/protocols/ace/RMCast/Socket.cpp b/protocols/ace/RMCast/Socket.cpp
index 519879ee489..4ed634bd59d 100644
--- a/protocols/ace/RMCast/Socket.cpp
+++ b/protocols/ace/RMCast/Socket.cpp
@@ -19,10 +19,17 @@
#include "Reassemble.h"
#include "Acknowledge.h"
#include "Retransmit.h"
+#include "Flow.h"
#include "Link.h"
#include "Socket.h"
+/*
+#include <iostream>
+using std::cerr;
+using std::endl;
+*/
+
namespace ACE_RMCast
{
class Socket_Impl : protected Element
@@ -30,7 +37,7 @@ namespace ACE_RMCast
public:
~Socket_Impl ();
- Socket_Impl (Address const& a, bool loop, bool simulator);
+ Socket_Impl (Address const& a, bool loop, Parameters const& params);
public:
void
@@ -51,6 +58,7 @@ namespace ACE_RMCast
private:
bool loop_;
+ Parameters const params_;
Mutex mutex_;
Condition cond_;
@@ -63,22 +71,25 @@ namespace ACE_RMCast
ACE_Auto_Ptr<Reassemble> reassemble_;
ACE_Auto_Ptr<Acknowledge> acknowledge_;
ACE_Auto_Ptr<Retransmit> retransmit_;
+ ACE_Auto_Ptr<Flow> flow_;
ACE_Auto_Ptr<Link> link_;
};
Socket_Impl::
- Socket_Impl (Address const& a, bool loop, bool simulator)
+ Socket_Impl (Address const& a, bool loop, Parameters const& params)
: loop_ (loop),
+ params_ (params),
cond_ (mutex_)
{
signal_pipe_.open ();
- fragment_.reset (new Fragment ());
- reassemble_.reset (new Reassemble ());
- acknowledge_.reset (new Acknowledge ());
- retransmit_.reset (new Retransmit ());
- link_.reset (new Link (a, simulator));
+ fragment_.reset (new Fragment (params_));
+ reassemble_.reset (new Reassemble (params_));
+ acknowledge_.reset (new Acknowledge (params_));
+ retransmit_.reset (new Retransmit (params_));
+ flow_.reset (new Flow (params_));
+ link_.reset (new Link (a, params_));
// Start IN stack from top to bottom.
//
@@ -87,12 +98,14 @@ namespace ACE_RMCast
reassemble_->in_start (fragment_.get ());
acknowledge_->in_start (reassemble_.get ());
retransmit_->in_start (acknowledge_.get ());
- link_->in_start (retransmit_.get ());
+ flow_->in_start (retransmit_.get ());
+ link_->in_start (flow_.get ());
// Start OUT stack from bottom up.
//
link_->out_start (0);
- retransmit_->out_start (link_.get ());
+ flow_->out_start (link_.get ());
+ retransmit_->out_start (flow_.get ());
acknowledge_->out_start (retransmit_.get ());
reassemble_->out_start (acknowledge_.get ());
fragment_->out_start (reassemble_.get ());
@@ -109,11 +122,13 @@ namespace ACE_RMCast
reassemble_->out_stop ();
acknowledge_->out_stop ();
retransmit_->out_stop ();
+ flow_->out_stop ();
link_->out_stop ();
// Stop IN stack from bottom up.
//
link_->in_stop ();
+ flow_->in_stop ();
retransmit_->in_stop ();
acknowledge_->in_stop ();
reassemble_->in_stop ();
@@ -269,6 +284,9 @@ namespace ACE_RMCast
Lock l (mutex_);
+ //if (queue_.size () != 0)
+ // cerr << "recv socket queue size: " << queue_.size () << endl;
+
bool signal (queue_.is_empty ());
queue_.enqueue_tail (m);
@@ -287,7 +305,6 @@ namespace ACE_RMCast
cond_.signal ();
}
-
}
}
@@ -301,8 +318,8 @@ namespace ACE_RMCast
}
Socket::
- Socket (Address const& a, bool loop, bool simulator)
- : impl_ (new Socket_Impl (a, loop, simulator))
+ Socket (Address const& a, bool loop, Parameters const& params)
+ : impl_ (new Socket_Impl (a, loop, params))
{
}
diff --git a/protocols/ace/RMCast/Socket.h b/protocols/ace/RMCast/Socket.h
index 98e3b8b0ba8..c1a0a26a565 100644
--- a/protocols/ace/RMCast/Socket.h
+++ b/protocols/ace/RMCast/Socket.h
@@ -12,6 +12,7 @@
#include "ace/Time_Value.h"
#include "RMCast_Export.h"
+#include "Parameters.h"
namespace ACE_RMCast
@@ -27,7 +28,9 @@ namespace ACE_RMCast
// If 'simulator' is 'true' then internal message loss and
// reordering simulator (on IPv4 level) is turned on.
//
- Socket (ACE_INET_Addr const& a, bool loop = true, bool simulator = false);
+ Socket (ACE_INET_Addr const& a,
+ bool loop = true,
+ Parameters const& params = Parameters ());
public:
virtual void
diff --git a/protocols/examples/RMCast/Send_Msg/Protocol.h b/protocols/examples/RMCast/Send_Msg/Protocol.h
index 9c7be6eb0c8..88fdb4d6a51 100644
--- a/protocols/examples/RMCast/Send_Msg/Protocol.h
+++ b/protocols/examples/RMCast/Send_Msg/Protocol.h
@@ -5,12 +5,12 @@
#ifndef PROTOCOL_H
#define PROTOCOL_H
-unsigned short const payload_size = 512;
-unsigned long const message_count = 10000;
+unsigned short const payload_size = 702;
+unsigned long const message_count = 80000;
struct Message
{
- unsigned long sn;
+ unsigned int sn;
unsigned short payload[payload_size];
};
diff --git a/protocols/examples/RMCast/Send_Msg/Receiver.cpp b/protocols/examples/RMCast/Send_Msg/Receiver.cpp
index 39808151fe9..f86e95192f6 100644
--- a/protocols/examples/RMCast/Send_Msg/Receiver.cpp
+++ b/protocols/examples/RMCast/Send_Msg/Receiver.cpp
@@ -5,6 +5,8 @@
#include "ace/Vector_T.h"
#include "ace/Log_Msg.h"
#include "ace/OS_NS_string.h"
+#include "ace/Time_Value.h" // ACE_Time_Value
+#include "ace/OS_NS_sys_time.h" // gettimeofday
#include "ace/RMCast/Socket.h"
@@ -47,7 +49,7 @@ ACE_TMAIN (int argc, ACE_TCHAR* argv[])
// VC6 does not know about new rules.
//
{
- for (unsigned long i = 0; i < message_count; ++i)
+ for (unsigned int i = 0; i < message_count; ++i)
{
received.push_back (0);
damaged.push_back (0);
@@ -57,11 +59,19 @@ ACE_TMAIN (int argc, ACE_TCHAR* argv[])
Message msg;
+ bool first (true);
+ ACE_Time_Value start_time, time;
while (true)
{
ssize_t s = socket.size ();
+ if (first)
+ {
+ start_time = ACE_OS::gettimeofday ();
+ first = false;
+ }
+
if (s == -1 && errno == ENOENT)
{
ACE_ERROR ((LM_ERROR, "unavailable message detected\n"));
@@ -106,7 +116,9 @@ ACE_TMAIN (int argc, ACE_TCHAR* argv[])
if (msg.sn + 1 == message_count) break;
}
- unsigned long lost_count (0), damaged_count (0), duplicate_count (0);
+ time = ACE_OS::gettimeofday () - start_time;
+
+ unsigned int lost_count (0), damaged_count (0), duplicate_count (0);
{
for (Status_List::Iterator i (received); !i.done (); i.advance ())
@@ -140,13 +152,18 @@ ACE_TMAIN (int argc, ACE_TCHAR* argv[])
}
}
+ unsigned long tput =
+ (sizeof (msg) * message_count) / (time.msec () == 0 ? 1 : time.msec ());
+
ACE_DEBUG ((LM_DEBUG,
- "lost : %d\n"
- "damaged : %d\n"
- "duplicate : %d\n",
+ "lost : %d\n"
+ "damaged : %d\n"
+ "duplicate : %d\n"
+ "throughput : %d KB/sec\n",
lost_count,
damaged_count,
- duplicate_count));
+ duplicate_count,
+ tput));
/*
cout << "lost message dump:" << endl;