1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
|
// author : Boris Kolpackov <boris@kolpackov.net>
// $Id$
#include "Flow.h"
#include "ace/ACE.h"
#include "ace/OS_NS_unistd.h" // sleep
#include "ace/OS_NS_sys_time.h" // gettimeofday
#include "ace/os_include/os_math.h" // exp
namespace ACE_RMCast
{
Flow::
Flow (Parameters const& )
: //params_ (params),
nak_time_ (0, 0),
sample_start_time_ (0, 0),
sample_bytes_ (0),
current_tput_ (0.0),
cap_tput_ (0.0)
{
}
void Flow::send (Message_ptr m)
{
if (Data const* data = static_cast<Data const*> (m->find (Data::id)))
{
ACE_Time_Value now_time (ACE_OS::gettimeofday ());
Lock l (mutex_);
sample_bytes_ += data->size ();
if (sample_start_time_ == ACE_Time_Value (0, 0))
{
sample_start_time_ = now_time;
}
else
{
ACE_Time_Value delta (now_time - sample_start_time_);
if (delta > ACE_Time_Value (0, 2000))
{
current_tput_ =
double (sample_bytes_) / (delta.sec () * 1000000 + delta.usec ());
// cerr << "tput: " << current_tput_ << " bytes/usec" << endl;
sample_bytes_ = 0;
sample_start_time_ = ACE_Time_Value (0, 0);
}
}
if (!ACE::is_equal (cap_tput_, 0.0) &&
!ACE::is_equal (current_tput_, 0.0) &&
current_tput_ > cap_tput_)
{
double dev = (current_tput_ - cap_tput_) / current_tput_;
// cerr << "deviation: " << dev << endl;
// Cap decay algorithm.
//
{
ACE_Time_Value delta (now_time - nak_time_);
unsigned long msec = delta.msec ();
double x = msec / -16000.0;
double y = 1.0 * exp (x);
cap_tput_ = cap_tput_ / y;
// cerr << "cap decay: " << cap_tput_ << " bytes/usec" << endl;
}
l.release ();
timespec time;
time.tv_sec = 0;
time.tv_nsec = static_cast<unsigned long> (dev * 500000.0);
// Don't bother to sleep if the time is less than 10 usec.
//
if (time.tv_nsec > 10000)
ACE_OS::sleep (ACE_Time_Value (time));
}
}
out_->send (m);
}
void Flow::recv (Message_ptr m)
{
if (NAK const* nak = static_cast<NAK const*> (m->find (NAK::id)))
{
Address to (static_cast<To const*> (m->find (To::id))->address ());
if (nak->address () == to)
{
// This one is for us.
//
//cerr << "NAK from "
// << static_cast<From const*> (m->find (From::id))->address ()
// << " for " << nak->count () << " sns." << endl;
ACE_Time_Value nak_time (ACE_OS::gettimeofday ());
Lock l (mutex_);
nak_time_ = nak_time;
if (ACE::is_equal (cap_tput_, 0.0))
cap_tput_ = current_tput_;
if (!ACE::is_equal (cap_tput_, 0.0))
{
cap_tput_ = cap_tput_ - cap_tput_ / 6.0;
// cerr << "cap: " << cap_tput_ << " bytes/usec" << endl;
}
}
}
in_->recv (m);
}
}
|